Repository: incubator-tephra Updated Branches: refs/heads/master 79b97198c -> abf34e5f4
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java new file mode 100644 index 0000000..560b0fe --- /dev/null +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.Collections; +import java.util.List; + +/** + * Base class for tests that need a HBase cluster + */ +@SuppressWarnings("WeakerAccess") +public abstract class AbstractHBaseTableTest { + protected static HBaseTestingUtility testUtil; + protected static HBaseAdmin hBaseAdmin; + protected static Configuration conf; + + @BeforeClass + public static void startMiniCluster() throws Exception { + testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf); + conf = testUtil.getConfiguration(); + + // Tune down the connection thread pool size + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + // Tunn down handler threads in regionserver + conf.setInt("hbase.regionserver.handler.count", 10); + + // Set to random port + conf.setInt("hbase.master.port", 0); + conf.setInt("hbase.master.info.port", 0); + conf.setInt("hbase.regionserver.port", 0); + conf.setInt("hbase.regionserver.info.port", 0); + + testUtil.startMiniCluster(); + hBaseAdmin = testUtil.getHBaseAdmin(); + } + + @AfterClass + public static void shutdownMiniCluster() throws Exception { + try { + if (hBaseAdmin != null) { + hBaseAdmin.close(); + } + } finally { + testUtil.shutdownMiniCluster(); + } + } + + protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception { + return createTable(tableName, columnFamilies, false, + Collections.singletonList(TransactionProcessor.class.getName())); + } + + protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, + List<String> coprocessors) throws Exception { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : columnFamilies) { + HColumnDescriptor columnDesc = new HColumnDescriptor(family); + columnDesc.setMaxVersions(Integer.MAX_VALUE); + columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis + desc.addFamily(columnDesc); + } + if (existingData) { + desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); + } + // Divide individually to prevent any overflow + int priority = Coprocessor.PRIORITY_USER; + // order in list is the same order that coprocessors will be invoked + for (String coprocessor : coprocessors) { + desc.addCoprocessor(coprocessor, null, ++priority, null); + } + hBaseAdmin.createTable(desc); + testUtil.waitTableAvailable(tableName, 5000); + return new HTable(testUtil.getConfiguration(), tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 6dc7c28..e2fadbd 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -19,21 +19,14 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -76,6 +69,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -89,14 +83,11 @@ import static org.junit.Assert.fail; /** * Tests for TransactionAwareHTables. */ -public class TransactionAwareHTableTest { +public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class); - private static HBaseTestingUtility testUtil; - private static HBaseAdmin hBaseAdmin; - private static TransactionStateStorage txStateStorage; - private static TransactionManager txManager; - private static Configuration conf; + static TransactionStateStorage txStateStorage; + static TransactionManager txManager; private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; @@ -146,23 +137,6 @@ public class TransactionAwareHTableTest { @BeforeClass public static void setupBeforeClass() throws Exception { - testUtil = new HBaseTestingUtility(); - conf = testUtil.getConfiguration(); - - // Tune down the connection thread pool size - conf.setInt("hbase.hconnection.threads.core", 5); - conf.setInt("hbase.hconnection.threads.max", 10); - // Tunn down handler threads in regionserver - conf.setInt("hbase.regionserver.handler.count", 10); - - // Set to random port - conf.setInt("hbase.master.port", 0); - conf.setInt("hbase.master.info.port", 0); - conf.setInt("hbase.regionserver.port", 0); - conf.setInt("hbase.regionserver.info.port", 0); - - testUtil.startMiniCluster(); - hBaseAdmin = testUtil.getHBaseAdmin(); txStateStorage = new InMemoryTransactionStateStorage(); txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); txManager.startAndWait(); @@ -170,8 +144,9 @@ public class TransactionAwareHTableTest { @AfterClass public static void shutdownAfterClass() throws Exception { - testUtil.shutdownMiniCluster(); - hBaseAdmin.close(); + if (txManager != null) { + txManager.stopAndWait(); + } } @Before @@ -186,34 +161,6 @@ public class TransactionAwareHTableTest { hBaseAdmin.disableTable(TestBytes.table); hBaseAdmin.deleteTable(TestBytes.table); } - - private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception { - return createTable(tableName, columnFamilies, false, Collections.<String>emptyList()); - } - - private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, - List<String> coprocessors) throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - for (byte[] family : columnFamilies) { - HColumnDescriptor columnDesc = new HColumnDescriptor(family); - columnDesc.setMaxVersions(Integer.MAX_VALUE); - columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis - desc.addFamily(columnDesc); - } - if (existingData) { - desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); - } - // Divide individually to prevent any overflow - int priority = Coprocessor.PRIORITY_USER; - desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null); - // order in list is the same order that coprocessors will be invoked - for (String coprocessor : coprocessors) { - desc.addCoprocessor(coprocessor, null, ++priority, null); - } - hBaseAdmin.createTable(desc); - testUtil.waitTableAvailable(tableName, 5000); - return new HTable(testUtil.getConfiguration(), tableName); - } /** * Test transactional put and get requests. @@ -409,7 +356,7 @@ public class TransactionAwareHTableTest { public void testAttributesPreserved() throws Exception { HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"), new byte[][]{TestBytes.family, TestBytes.family2}, false, - Lists.newArrayList(TestRegionObserver.class.getName())); + Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName())); try { TransactionAwareHTable txTable = new TransactionAwareHTable(hTable); TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); @@ -1123,7 +1070,7 @@ public class TransactionAwareHTableTest { TransactionAwareHTable txTable = new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, - Collections.<String>emptyList())); + Collections.singletonList(TransactionProcessor.class.getName()))); TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); // Add some pre-existing, non-transactional data @@ -1272,8 +1219,9 @@ public class TransactionAwareHTableTest { @Test public void testVisibilityAll() throws Exception { - HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"), - new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList()); + HTable nonTxTable = + createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2}, + true, Collections.singletonList(TransactionProcessor.class.getName())); TransactionAwareHTable txTable = new TransactionAwareHTable(nonTxTable, TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes @@ -1549,6 +1497,66 @@ public class TransactionAwareHTableTest { transactionContext.finish(); } + @Test + public void testTxLifetime() throws Exception { + // Add some initial values + transactionContext.start(); + Put put = new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + transactionAwareHTable.put(put); + put = new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value); + transactionAwareHTable.put(put); + transactionContext.finish(); + + // Simulate writing with a transaction past its max lifetime + transactionContext.start(); + Transaction currentTx = transactionContext.getCurrentTransaction(); + Assert.assertNotNull(currentTx); + + // Create a transaction that is past the max lifetime + long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS); + Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId, + currentTx.getInvalids(), currentTx.getInProgress(), + currentTx.getFirstShortInProgress()); + transactionAwareHTable.updateTx(oldTx); + // Put with the old transaction should fail + put = new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + try { + transactionAwareHTable.put(put); + Assert.fail("Excepted exception with old transaction!"); + } catch (IOException e) { + // Expected exception + } + + // Delete with the old transaction should also fail + Delete delete = new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier); + try { + transactionAwareHTable.delete(delete); + Assert.fail("Excepted exception with old transaction!"); + } catch (IOException e) { + // Expected exception + } + + // Now update the table to use the current transaction + transactionAwareHTable.updateTx(currentTx); + put = new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value); + transactionAwareHTable.put(put); + delete = new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2); + transactionAwareHTable.delete(delete); + + // Verify values with the same transaction since we cannot commit the old transaction + verifyRow(transactionAwareHTable, + new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), TestBytes.value); + transactionContext.finish(); + } + /** * Tests that transaction co-processor works with older clients * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java new file mode 100644 index 0000000..402892f --- /dev/null +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.ImmutableSortedSet; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Test methods of {@link DataJanitorState} + */ +// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start the mini-cluster only once +public class DataJanitorStateTest extends AbstractHBaseTableTest { + + private TableName pruneStateTable; + private DataJanitorState dataJanitorState; + + @Before + public void beforeTest() throws Exception { + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.<String>emptyList()); + table.close(); + + dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + } + + @After + public void afterTest() throws Exception { + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + + @Test + public void testSavePruneUpperBound() throws Exception { + int max = 20; + + // Nothing should be present in the beginning + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L))); + + // Save some region - prune upper bound values + // We should have values for regions 0, 2, 4, 6, ..., max-2 after this + for (long i = 0; i < max; i += 2) { + dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i); + } + + Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L))); + + // Verify all the saved values + for (long i = 0; i < max; ++i) { + long expected = i % 2 == 0 ? i : -1; + Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i))); + } + // Regions not present should give -1 + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max + 50L))); + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max + 10L) * -1))); + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L))); + + SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (long i = 0; i < max; ++i) { + allRegions.add(Bytes.toBytes(i)); + if (i % 2 == 0) { + expectedMap.put(Bytes.toBytes(i), i); + } + } + Assert.assertEquals(max / 2, expectedMap.size()); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions)); + + SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR) + .add(Bytes.toBytes((max + 20L) * -1)) + .add(Bytes.toBytes(6L)) + .add(Bytes.toBytes(15L)) + .add(Bytes.toBytes(18L)) + .add(Bytes.toBytes(max + 33L)) + .build(); + expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR) + .put(Bytes.toBytes(6L), 6L) + .put(Bytes.toBytes(18L), 18L) + .build(); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions)); + + // Delete regions that have prune upper bound before 15 and not in set (4, 8) + ImmutableSortedSet<byte[]> excludeRegions = + ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build(); + dataJanitorState.deletePruneUpperBounds(15, excludeRegions); + // Regions 0, 2, 6 and 10 should have been deleted now + expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR) + .put(Bytes.toBytes(4L), 4L) + .put(Bytes.toBytes(8L), 8L) + .put(Bytes.toBytes(16L), 16L) + .put(Bytes.toBytes(18L), 18L) + .build(); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions)); + } + + @Test + public void testSaveRegionTime() throws Exception { + int maxTime = 100; + + // Nothing should be present in the beginning + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime)); + + // Save regions for time + Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>(); + for (long time = 0; time < maxTime; time += 10) { + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (long region = 0; region < 10; region += 2) { + regions.add(Bytes.toBytes((time * 10) + region)); + } + regionsTime.put(time, regions); + dataJanitorState.saveRegionsForTime(time, regions); + } + + // Verify saved regions + Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); + Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31)); + Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)), + dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10)); + + // Delete regions saved on or before time 30 + dataJanitorState.deleteAllRegionsOnOrBeforeTime(30); + // Values on or before time 30 should be deleted + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Values after time 30 should still exist + Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40)); + } + + @Test + public void testSaveInactiveTransactionBoundTime() throws Exception { + int maxTime = 100; + + // Nothing sould be present in the beginning + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); + + // Save inactive transaction bounds for various time values + for (long time = 0; time < maxTime; time += 10) { + dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2); + } + + // Verify written values + Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0)); + Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15)); + Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime + 100)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime + 55) * -1L)); + + // Delete values saved on or before time 20 + dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20); + // Values on or before time 20 should be deleted + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20)); + // Values after time 20 should still exist + Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30)); + Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java new file mode 100644 index 0000000..310c710 --- /dev/null +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.ImmutableSortedSet; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.TransactionStateCache; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.apache.tephra.hbase.TransactionAwareHTable; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.txprune.TransactionPruningPlugin; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Test invalid list pruning + */ +public class InvalidListPruneTest extends AbstractHBaseTableTest { + private static final byte[] family = Bytes.toBytes("f1"); + private static final byte[] qualifier = Bytes.toBytes("col1"); + private static final int MAX_ROWS = 1000; + + private static TableName txDataTable1; + private static TableName pruneStateTable; + + // Override AbstractHBaseTableTest.startMiniCluster to setup configuration + @BeforeClass + public static void startMiniCluster() throws Exception { + // Setup the configuration to start HBase cluster with the invalid list pruning enabled + conf = HBaseConfiguration.create(); + conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); + AbstractHBaseTableTest.startMiniCluster(); + + TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage(); + TransactionManager txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); + txManager.startAndWait(); + + // Do some transactional data operations + txDataTable1 = TableName.valueOf("invalidListPruneTestTable1"); + HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) { + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + txContext.start(); + for (int i = 0; i < MAX_ROWS; ++i) { + txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, Bytes.toBytes(i))); + } + txContext.finish(); + } + + testUtil.flush(txDataTable1); + txManager.stopAndWait(); + + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + } + + @AfterClass + public static void shutdownAfterClass() throws Exception { + hBaseAdmin.disableTable(txDataTable1); + hBaseAdmin.deleteTable(txDataTable1); + } + + @Before + public void beforeTest() throws Exception { + createPruneStateTable(); + InMemoryTransactionStateCache.setTransactionSnapshot(null); + } + + private void createPruneStateTable() throws Exception { + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.<String>emptyList()); + table.close(); + } + + @After + public void afterTest() throws Exception { + deletePruneStateTable(); + } + + private void deletePruneStateTable() throws Exception { + if (hBaseAdmin.tableExists(pruneStateTable)) { + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + } + + @Test + public void testRecordCompactionState() throws Exception { + DataJanitorState dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + // No prune upper bound initially + Assert.assertEquals(-1, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + + // Create a new transaction snapshot + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + // Run minor compaction + testUtil.compact(txDataTable1, false); + // No prune upper bound after minor compaction too + Assert.assertEquals(-1, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + + // Run major compaction, and verify prune upper bound + testUtil.compact(txDataTable1, true); + Assert.assertEquals(50, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + + // Run major compaction again with same snapshot, prune upper bound should not change + testUtil.compact(txDataTable1, true); + Assert.assertEquals(50, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + + // Create a new transaction snapshot + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L), + ImmutableSortedMap.of(105L, new TransactionManager.InProgressTx( + 100, 30, TransactionManager.InProgressType.SHORT)))); + Assert.assertEquals(50, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + + // Run major compaction again, now prune upper bound should change + testUtil.compact(txDataTable1, true); + Assert.assertEquals(104, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + } + + @Test + public void testRecordCompactionStateNoTable() throws Exception { + // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table + // and make sure a major compaction succeeds + deletePruneStateTable(); + + // Create a new transaction snapshot + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + // Run major compaction, and verify it completes + long now = System.currentTimeMillis(); + testUtil.compact(txDataTable1, true); + long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get(); + Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime), + lastMajorCompactionTime >= now); + } + + @Test + public void testRecordCompactionStateNoTxSnapshot() throws Exception { + // Test recording state without having a transaction snapshot to make sure we don't disrupt + // major compaction in that case + InMemoryTransactionStateCache.setTransactionSnapshot(null); + // Run major compaction, and verify it completes + long now = System.currentTimeMillis(); + testUtil.compact(txDataTable1, true); + long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get(); + Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime), + lastMajorCompactionTime >= now); + } + + @Test + public void testPruneUpperBound() throws Exception { + DataJanitorState dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + try { + // Run without a transaction snapshot first + long now1 = 200; + long inactiveTxTimeNow1 = 150 * TxConstants.MAX_TX_PER_MS; + long expectedPruneUpperBound1 = -1; + // fetch prune upper bound + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1); + + TimeRegions expectedRegions1 = + new TimeRegions(now1, + ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR) + .add(getRegionName(txDataTable1, Bytes.toBytes(0))) + .build()); + // Assert prune state is recorded correctly + Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1)); + Assert.assertEquals(expectedPruneUpperBound1, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1)); + + // Run prune complete + transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1); + + // Assert prune state was cleaned up correctly based on the prune time + Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1)); + Assert.assertEquals(expectedPruneUpperBound1, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1)); + + // Create a new transaction snapshot, and run major compaction on txDataTable1 + // And run all assertions again + long now2 = 300; + long inactiveTxTimeNow2 = 250 * TxConstants.MAX_TX_PER_MS; + long expectedPruneUpperBound2 = 200 * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2, + ImmutableSet.of(expectedPruneUpperBound2), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + TimeRegions expectedRegions2 = + new TimeRegions(now2, + ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR) + .add(getRegionName(txDataTable1, Bytes.toBytes(0))) + .build()); + testUtil.compact(txDataTable1, true); + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2); + + Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2)); + Assert.assertEquals(expectedPruneUpperBound2, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2)); + Assert.assertEquals(expectedRegions1, dataJanitorState.getRegionsOnOrBeforeTime(now1)); + Assert.assertEquals(inactiveTxTimeNow1, dataJanitorState.getInactiveTransactionBoundForTime(now1)); + + transactionPruningPlugin.pruneComplete(now2, pruneUpperBound2); + Assert.assertEquals(expectedRegions2, dataJanitorState.getRegionsOnOrBeforeTime(now2)); + Assert.assertEquals(expectedPruneUpperBound2, + dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); + Assert.assertEquals(inactiveTxTimeNow2, dataJanitorState.getInactiveTransactionBoundForTime(now2)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(now1)); + Assert.assertEquals(expectedPruneUpperBound1, dataJanitorState.getInactiveTransactionBoundForTime(now1)); + + } finally { + transactionPruningPlugin.destroy(); + } + } + + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { + HRegionLocation regionLocation = + testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row); + return regionLocation.getRegionInfo().getRegionName(); + } + + /** + * A transaction co-processor that uses in-memory {@link TransactionSnapshot} for testing + */ + @SuppressWarnings("WeakerAccess") + public static class TestTransactionProcessor extends TransactionProcessor { + private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1); + + @Override + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new Supplier<TransactionStateCache>() { + @Override + public TransactionStateCache get() { + return new InMemoryTransactionStateCache(); + } + }; + } + + @Override + public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile, + CompactionRequest request) throws IOException { + super.postCompact(e, store, resultFile, request); + lastMajorCompactionTime.set(System.currentTimeMillis()); + } + } + + /** + * Used to supply in-memory {@link TransactionSnapshot} to {@link TestTransactionProcessor} for testing + */ + @SuppressWarnings("WeakerAccess") + public static class InMemoryTransactionStateCache extends TransactionStateCache { + private static TransactionVisibilityState transactionSnapshot; + + public static void setTransactionSnapshot(TransactionVisibilityState transactionSnapshot) { + InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot; + } + + @Override + protected void startUp() throws Exception { + // Nothing to do + } + + @Override + protected void shutDown() throws Exception { + // Nothing to do + } + + @Override + public TransactionVisibilityState getLatestState() { + return transactionSnapshot; + } + } + + @SuppressWarnings("WeakerAccess") + public static class TestTransactionPruningPlugin extends HBaseTransactionPruningPlugin { + @Override + protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) { + return tableDescriptor.hasCoprocessor(TestTransactionProcessor.class.getName()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/abf34e5f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index e495692..ceffa4c 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -308,8 +308,7 @@ public class TransactionProcessor extends BaseRegionObserver { compactionState.record(request, snapshot); } // Also make sure to use the same snapshot for the compaction - return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, - scanType, earliestPutTs); + return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } @Override
