http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java new file mode 100644 index 0000000..558adaa --- /dev/null +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/CellSkipFilterTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.coprocessor; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * HBase 1.3 specific test for testing {@link CellSkipFilter}. + */ +public class CellSkipFilterTest { + + private static final String ROW1KEY = "row1"; + private static final String ROW2KEY = "row2"; + private static final String FAM1KEY = "fam1"; + private static final String COL1KEY = "col1"; + private static final String FAM2KEY = "fam2"; + private static final String COL2KEY = "col2"; + private static final String VALUE = "value"; + + @Test + public void testSkipFiltering() throws Exception { + long timestamp = System.currentTimeMillis(); + // Test to check that we get NEXT_COL once the INCLUDE_AND_NEXT_COL is returned for the same key + Filter filter = new CellSkipFilter(new MyFilter(0)); + assertEquals(Filter.ReturnCode.INCLUDE, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, + VALUE, timestamp - 1))); + + // Next call should get NEXT_COL instead of SKIP, as it would be returned by CellSkipFilter + assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp - 2))); + + // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter + assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp - 3))); + + // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should + // give SKIP from MyFilter + assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM2KEY, COL1KEY, VALUE, + timestamp - 4))); + + // Test to check that we get NEXT_COL once the NEXT_COL is returned for the same key + filter = new CellSkipFilter(new MyFilter(2)); + assertEquals(Filter.ReturnCode.SKIP, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp))); + assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp - 1))); + + // Next call should get NEXT_COL instead of NEXT_ROW, as it would be returned by CellSkipFilter + assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp - 2))); + + // Next call with the same key should return the NEXT_COL again, as it would be returned by CellSkipFilter + assertEquals(Filter.ReturnCode.NEXT_COL, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL1KEY, VALUE, + timestamp - 3))); + + // Since MyFilter counter is not incremented in the previous call, filtering for the different keyvalue should + // give NEXT_ROW from MyFilter + assertEquals(Filter.ReturnCode.NEXT_ROW, filter.filterKeyValue(newKeyValue(ROW1KEY, FAM1KEY, COL2KEY, VALUE, + timestamp - 4))); + + // Next call with the new key should returned the SEEK_NEXT_USING_HINT + assertEquals(Filter.ReturnCode.SEEK_NEXT_USING_HINT, filter.filterKeyValue(newKeyValue(ROW2KEY, FAM1KEY, COL1KEY, + VALUE, timestamp - 5))); + } + + private KeyValue newKeyValue(String rowkey, String family, String column, String value, long timestamp) { + return new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes(family), Bytes.toBytes(column), + timestamp, Bytes.toBytes(value)); + } + + /** + * Sample filter for testing. This filter maintains the {@link List} of {@link ReturnCode}s. It accepts the + * start index in the list and start serving the return codes corresponding that that index. Every time the + * return code is served, index is incremented. + */ + class MyFilter extends FilterBase { + + private final List<ReturnCode> returnCodes; + private int counter; + + public MyFilter(int startIndex) { + returnCodes = Arrays.asList(ReturnCode.INCLUDE, ReturnCode.INCLUDE_AND_NEXT_COL, ReturnCode.SKIP, + ReturnCode.NEXT_COL, ReturnCode.NEXT_ROW, ReturnCode.SEEK_NEXT_USING_HINT); + counter = startIndex; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + ReturnCode code = returnCodes.get(counter % returnCodes.size()); + counter++; + return code; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java new file mode 100644 index 0000000..15842a3 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.coprocessor; + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MockRegionServerServices; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tephra.ChangeId; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.TransactionStateCache; +import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.util.TxUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor. + */ +public class TransactionProcessorTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class); + + // 8 versions, 1 hour apart, latest is current ts. + private static final long[] V; + + static { + long now = System.currentTimeMillis(); + V = new long[9]; + for (int i = 0; i < V.length; i++) { + V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS; + } + } + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + private static MiniDFSCluster dfsCluster; + private static Configuration conf; + private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]}); + private static TransactionVisibilityState txVisibilityState; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration hConf = new Configuration(); + String rootDir = tmpFolder.newFolder().getAbsolutePath(); + hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir); + hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase"); + + dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); + dfsCluster.waitActive(); + conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf()); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + String localTestDir = tmpFolder.newFolder().getAbsolutePath(); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + + // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); + TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, + // this will set visibility upper bound to V[6] + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), + new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); + txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), + txSnapshot.getWritePointer(), txSnapshot.getInvalid(), + txSnapshot.getInProgress()); + HDFSTransactionStateStorage tmpStorage = + new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + tmpStorage.startAndWait(); + tmpStorage.writeSnapshot(txSnapshot); + tmpStorage.stopAndWait(); + } + + @AfterClass + public static void shutdownAfterClass() throws Exception { + dfsCluster.shutdown(); + } + + @Test + public void testDataJanitorRegionScanner() throws Exception { + String tableName = "TestRegionScanner"; + byte[] familyBytes = Bytes.toBytes("f"); + byte[] columnBytes = Bytes.toBytes("c"); + HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3)); + try { + region.initialize(); + TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); + LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + + for (int i = 1; i <= 8; i++) { + for (int k = 1; k <= i; k++) { + Put p = new Put(Bytes.toBytes(i)); + p.add(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k])); + region.put(p); + } + } + + List<Cell> results = Lists.newArrayList(); + + // force a flush to clear the data + // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set + + LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString()); + region.flushcache(true, false); + + // now a normal scan should only return the valid rows + // do not use a filter here to test that cleanup works on flush + Scan scan = new Scan(); + scan.setMaxVersions(10); + RegionScanner regionScanner = region.getScanner(scan); + + // first returned value should be "4" with version "4" + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 4, new long[]{V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 5, new long[] {V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 6, new long[]{V[6], V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 7, new long[]{V[6], V[4]}); + + results.clear(); + assertFalse(regionScanner.next(results)); + assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]}); + } finally { + region.close(); + } + } + + @Test + public void testDeleteFiltering() throws Exception { + String tableName = "TestDeleteFiltering"; + byte[] familyBytes = Bytes.toBytes("f"); + byte[] columnBytes = Bytes.toBytes("c"); + HRegion region = createRegion(tableName, familyBytes, 0); + try { + region.initialize(); + TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); + LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + + byte[] row = Bytes.toBytes(1); + for (int i = 4; i < V.length; i++) { + Put p = new Put(row); + p.add(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i])); + region.put(p); + } + + // delete from the third entry back + // take that cell's timestamp + 1 to simulate a delete in a new tx + long deleteTs = V[5] + 1; + Delete d = new Delete(row, deleteTs); + LOG.info("Issuing delete at timestamp " + deleteTs); + // row deletes are not yet supported (TransactionAwareHTable normally handles this) + d.deleteColumns(familyBytes, columnBytes); + region.delete(d); + + List<Cell> results = Lists.newArrayList(); + + // force a flush to clear the data + // during flush, we should drop the deleted version, but not the others + LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString()); + region.flushcache(true, false); + + // now a normal scan should return row with versions at: V[8], V[6]. + // V[7] is invalid and V[5] and prior are deleted. + Scan scan = new Scan(); + scan.setMaxVersions(10); + RegionScanner regionScanner = region.getScanner(scan); + // should be only one row + assertFalse(regionScanner.next(results)); + assertKeyValueMatches(results, 1, + new long[]{V[8], V[6], deleteTs}, + new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]}); + } finally { + region.close(); + } + } + + @Test + public void testDeleteMarkerCleanup() throws Exception { + String tableName = "TestDeleteMarkerCleanup"; + byte[] familyBytes = Bytes.toBytes("f"); + HRegion region = createRegion(tableName, familyBytes, 0); + try { + region.initialize(); + + // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal + long writeTs = txVisibilityState.getVisibilityUpperBound() - 10; + // deletes are performed after the writes, but still before the visibility upper bound + long deleteTs = writeTs + 1; + // write separate columns to confirm that delete markers survive across flushes + byte[] row = Bytes.toBytes(100); + Put p = new Put(row); + + LOG.info("Writing columns at timestamp " + writeTs); + for (int i = 0; i < 5; i++) { + byte[] iBytes = Bytes.toBytes(i); + p.add(familyBytes, iBytes, writeTs, iBytes); + } + region.put(p); + // read all back + Scan scan = new Scan(row); + RegionScanner regionScanner = region.getScanner(scan); + List<Cell> results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + for (int i = 0; i < 5; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + + // force a flush to clear the memstore + LOG.info("Before delete, flushing region " + region.getRegionInfo().getRegionNameAsString()); + region.flushcache(false, false); + + // delete the odd entries + for (int i = 0; i < 5; i++) { + if (i % 2 == 1) { + // deletes are performed as puts with empty values + Put deletePut = new Put(row); + deletePut.add(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]); + region.put(deletePut); + } + } + + // read all back + scan = new Scan(row); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN)); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + LOG.info("Got cell " + cell); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + + // force another flush on the delete markers + // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction + LOG.info("After delete, flushing region " + region.getRegionInfo().getRegionNameAsString()); + region.flushcache(true, false); + + scan = new Scan(row); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN)); + + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + + // force a major compaction + LOG.info("Forcing major compaction of region " + region.getRegionInfo().getRegionNameAsString()); + region.compact(true); + + // perform a raw scan (no filter) to confirm that the delete markers are now gone + scan = new Scan(row); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + } finally { + region.close(); + } + } + + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.add(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.deleteFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List<Cell> results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], cell.getQualifier()); + assertArrayEquals(new byte[0], cell.getValue()); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, cell.getValue()); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN)); + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + + @Test + public void testPreExistingData() throws Exception { + String tableName = "TestPreExistingData"; + byte[] familyBytes = Bytes.toBytes("f"); + long ttlMillis = TimeUnit.DAYS.toMillis(14); + HRegion region = createRegion(tableName, familyBytes, ttlMillis); + try { + region.initialize(); + + // timestamps for pre-existing, non-transactional data + long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS; + long older = now - ttlMillis / 2; + long newer = now - ttlMillis / 3; + // timestamps for transactional data + long nowTx = txVisibilityState.getVisibilityUpperBound(); + long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS; + long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS; + + Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + ttls.put(familyBytes, ttlMillis); + + List<Cell> cells = new ArrayList<>(); + cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11"))); + cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12"))); + cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21"))); + cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22"))); + cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31"))); + cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32"))); + + // Write non-transactional and transactional data + for (Cell c : cells) { + region.put(new Put(c.getRow()).add(c.getFamily(), c.getQualifier(), c.getTimestamp(), c.getValue())); + } + + Scan rawScan = new Scan(); + rawScan.setMaxVersions(); + + Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState); + Scan txScan = new Scan(); + txScan.setMaxVersions(); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // read all back with raw scanner + scanAndAssert(region, cells, rawScan); + + // read all back with transaction filter + scanAndAssert(region, cells, txScan); + + // force a flush to clear the memstore + region.flushcache(true, false); + scanAndAssert(region, cells, txScan); + + // force a major compaction to remove any expired cells + region.compact(true); + scanAndAssert(region, cells, txScan); + + // Reduce TTL, this should make cells with timestamps older and olderTx expire + long newTtl = ttlMillis / 2 - 1; + region = updateTtl(region, familyBytes, newTtl); + ttls.put(familyBytes, newTtl); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // Raw scan should still give all cells + scanAndAssert(region, cells, rawScan); + // However, tx scan should not return expired cells + scanAndAssert(region, select(cells, 1, 3, 5), txScan); + + region.flushcache(true, false); + scanAndAssert(region, cells, rawScan); + + // force a major compaction to remove any expired cells + region.compact(true); + // This time raw scan too should not return expired cells, as they would be dropped during major compaction + scanAndAssert(region, select(cells, 1, 3, 5), rawScan); + + // Reduce TTL again to 1 ms, this should expire all cells + newTtl = 1; + region = updateTtl(region, familyBytes, newTtl); + ttls.put(familyBytes, newTtl); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // force a major compaction to remove expired cells + region.compact(true); + // This time raw scan should not return any cells, as all cells have expired. + scanAndAssert(region, Collections.<Cell>emptyList(), rawScan); + } finally { + region.close(); + } + } + + private List<Cell> select(List<Cell> cells, int... indexes) { + List<Cell> newCells = new ArrayList<>(); + for (int i : indexes) { + newCells.add(cells.get(i)); + } + return newCells; + } + + @SuppressWarnings("StatementWithEmptyBody") + private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception { + try (RegionScanner regionScanner = region.getScanner(scan)) { + List<Cell> results = Lists.newArrayList(); + while (regionScanner.next(results)) { } + assertEquals(expected, results); + } + } + + private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception { + region.close(); + HTableDescriptor htd = region.getTableDesc(); + HColumnDescriptor cfd = htd.getFamily(family); + if (ttl > 0) { + cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl)); + } + cfd.setMaxVersions(10); + return HRegion.openHRegion(region.getRegionInfo(), htd, region.getWAL(), conf, + new LocalRegionServerServices(conf, ServerName.valueOf( + InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis())), null); + } + + private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + HColumnDescriptor cfd = new HColumnDescriptor(family); + if (ttl > 0) { + cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl)); + } + cfd.setMaxVersions(10); + htd.addFamily(cfd); + htd.addCoprocessor(TransactionProcessor.class.getName()); + Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName()); + FileSystem fs = FileSystem.get(conf); + assertTrue(fs.mkdirs(tablePath)); + WALFactory walFactory = new WALFactory(conf, null, tableName + ".hlog"); + WAL hLog = walFactory.getWAL(new byte[]{1}, null); + HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName)); + HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo); + return new HRegion(regionFS, hLog, conf, htd, + new LocalRegionServerServices(conf, ServerName.valueOf( + InetAddress.getLocalHost().getHostName(), 0, System.currentTimeMillis()))); + } + + private void assertKeyValueMatches(List<Cell> results, int index, long[] versions) { + byte[][] values = new byte[versions.length][]; + for (int i = 0; i < versions.length; i++) { + values[i] = Bytes.toBytes(versions[i]); + } + assertKeyValueMatches(results, index, versions, values); + } + + private void assertKeyValueMatches(List<Cell> results, int index, long[] versions, byte[][] values) { + assertEquals(versions.length, results.size()); + assertEquals(values.length, results.size()); + for (int i = 0; i < versions.length; i++) { + Cell kv = results.get(i); + assertArrayEquals(Bytes.toBytes(index), kv.getRow()); + assertEquals(versions[i], kv.getTimestamp()); + assertArrayEquals(values[i], kv.getValue()); + } + } + + @Test + public void testTransactionStateCache() throws Exception { + TransactionStateCache cache = new TransactionStateCache(); + cache.setConf(conf); + cache.startAndWait(); + // verify that the transaction snapshot read matches what we wrote in setupBeforeClass() + TransactionVisibilityState cachedSnapshot = cache.getLatestState(); + assertNotNull(cachedSnapshot); + assertEquals(invalidSet, cachedSnapshot.getInvalid()); + cache.stopAndWait(); + } + + private static class LocalRegionServerServices extends MockRegionServerServices { + private final ServerName serverName; + + public LocalRegionServerServices(Configuration conf, ServerName serverName) { + super(conf); + this.serverName = serverName; + } + + @Override + public ServerName getServerName() { + return serverName; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java new file mode 100644 index 0000000..d976085 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.coprocessor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractTransactionVisibilityFilterTest; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * HBase 1.1 specific test for filtering logic applied when reading data transactionally. + */ +public class TransactionVisibilityFilterTest extends AbstractTransactionVisibilityFilterTest { + /** + * Test filtering of KeyValues for in-progress and invalid transactions. + * @throws Exception + */ + @Test + public void testFiltering() throws Exception { + TxFilterFactory txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + } + + @Test + public void testSubFilter() throws Exception { + final FilterBase includeFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE; + } + }; + TxFilterFactory txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + + final Filter skipFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.SKIP; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + final Filter includeNextFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + + final Filter nextColFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + } + + @Test + public void testSubFilterOverride() throws Exception { + final FilterBase includeFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE; + } + }; + TxFilterFactory txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE, + Filter.ReturnCode.INCLUDE, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE, + Filter.ReturnCode.INCLUDE)); + + final Filter skipFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.SKIP; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + final Filter includeNextFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + + final Filter nextColFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + } + + private void runFilteringTest(TxFilterFactory txFilterFactory, + List<Filter.ReturnCode> assertCodes) throws Exception { + + /* + * Start and stop some transactions. This will give us a transaction state something like the following + * (numbers only reflect ordering, not actual transaction IDs): + * 6 - in progress + * 5 - committed + * 4 - invalid + * 3 - in-progress + * 2 - committed + * 1 - committed + * + * read ptr = 5 + * write ptr = 6 + */ + + Transaction tx1 = txManager.startShort(); + assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET)); + assertTrue(txManager.commit(tx1)); + + Transaction tx2 = txManager.startShort(); + assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET)); + assertTrue(txManager.commit(tx2)); + + Transaction tx3 = txManager.startShort(); + Transaction tx4 = txManager.startShort(); + txManager.invalidate(tx4.getTransactionId()); + + Transaction tx5 = txManager.startShort(); + assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET)); + assertTrue(txManager.commit(tx5)); + + Transaction tx6 = txManager.startShort(); + + Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + Filter filter = txFilterFactory.getTxFilter(tx6, ttls); + + assertEquals(assertCodes.get(5), + filter.filterKeyValue(newKeyValue("row1", "val1", tx6.getTransactionId()))); + assertEquals(assertCodes.get(4), + filter.filterKeyValue(newKeyValue("row1", "val1", tx5.getTransactionId()))); + assertEquals(assertCodes.get(3), + filter.filterKeyValue(newKeyValue("row1", "val1", tx4.getTransactionId()))); + assertEquals(assertCodes.get(2), + filter.filterKeyValue(newKeyValue("row1", "val1", tx3.getTransactionId()))); + assertEquals(assertCodes.get(1), + filter.filterKeyValue(newKeyValue("row1", "val1", tx2.getTransactionId()))); + assertEquals(assertCodes.get(0), + filter.filterKeyValue(newKeyValue("row1", "val1", tx1.getTransactionId()))); + } + + /** + * Test filtering for TTL settings. + * @throws Exception + */ + @Test + public void testTTLFiltering() throws Exception { + Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + ttls.put(FAM, 10L); + ttls.put(FAM2, 30L); + ttls.put(FAM3, 0L); + + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); + Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 11 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 11 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 21 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 31 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 31 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 1001 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS))); + + // Verify ttl for pre-existing, non-transactional data + long preNow = now / TxConstants.MAX_TX_PER_MS; + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 9L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 10L))); + assertEquals(Filter.ReturnCode.NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 11L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 9L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 10L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 1001L))); + } + + protected KeyValue newKeyValue(String rowkey, String value, long timestamp) { + return new KeyValue(Bytes.toBytes(rowkey), FAM, COL, timestamp, Bytes.toBytes(value)); + } + + protected KeyValue newKeyValue(String rowkey, byte[] family, String value, long timestamp) { + return new KeyValue(Bytes.toBytes(rowkey), family, COL, timestamp, Bytes.toBytes(value)); + } + + private interface TxFilterFactory { + Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs); + } + + private class CustomTxFilter extends TransactionVisibilityFilter { + public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType, + @Nullable Filter cellFilter) { + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + } + + @Override + protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) { + switch (subFilterCode) { + case INCLUDE: + return ReturnCode.INCLUDE; + case INCLUDE_AND_NEXT_COL: + return ReturnCode.INCLUDE_AND_NEXT_COL; + case SKIP: + return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL; + default: + return subFilterCode; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java new file mode 100644 index 0000000..2e9dc17 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.ImmutableSortedSet; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Test methods of {@link DataJanitorState} + */ +// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start the mini-cluster only once +public class DataJanitorStateTest extends AbstractHBaseTableTest { + + private TableName pruneStateTable; + private DataJanitorState dataJanitorState; + + @Before + public void beforeTest() throws Exception { + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.<String>emptyList()); + table.close(); + + dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + + } + + @After + public void afterTest() throws Exception { + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + } + + @Test + public void testSavePruneUpperBound() throws Exception { + int max = 20; + + // Nothing should be present in the beginning + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L))); + + // Save some region - prune upper bound values + // We should have values for regions 0, 2, 4, 6, ..., max-2 after this + for (long i = 0; i < max; i += 2) { + dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i); + } + + Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L))); + + // Verify all the saved values + for (long i = 0; i < max; ++i) { + long expected = i % 2 == 0 ? i : -1; + Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i))); + } + // Regions not present should give -1 + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max + 50L))); + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max + 10L) * -1))); + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L))); + + SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (long i = 0; i < max; ++i) { + allRegions.add(Bytes.toBytes(i)); + if (i % 2 == 0) { + expectedMap.put(Bytes.toBytes(i), i); + } + } + Assert.assertEquals(max / 2, expectedMap.size()); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions)); + + SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR) + .add(Bytes.toBytes((max + 20L) * -1)) + .add(Bytes.toBytes(6L)) + .add(Bytes.toBytes(15L)) + .add(Bytes.toBytes(18L)) + .add(Bytes.toBytes(max + 33L)) + .build(); + expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR) + .put(Bytes.toBytes(6L), 6L) + .put(Bytes.toBytes(18L), 18L) + .build(); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions)); + + // Delete regions that have prune upper bound before 15 and not in set (4, 8) + ImmutableSortedSet<byte[]> excludeRegions = + ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build(); + dataJanitorState.deletePruneUpperBounds(15, excludeRegions); + // Regions 0, 2, 6 and 10 should have been deleted now + expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR) + .put(Bytes.toBytes(4L), 4L) + .put(Bytes.toBytes(8L), 8L) + .put(Bytes.toBytes(16L), 16L) + .put(Bytes.toBytes(18L), 18L) + .build(); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions)); + } + + @Test(timeout = 30000L) // The timeout is used to verify the fix for TEPHRA-230, the test will timeout without the fix + public void testSaveRegionTime() throws Exception { + int maxTime = 100; + + // Nothing should be present in the beginning + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime)); + + // Save regions for time + Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>(); + for (long time = 0; time < maxTime; time += 10) { + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (long region = 0; region < 10; region += 2) { + regions.add(Bytes.toBytes((time * 10) + region)); + } + regionsTime.put(time, regions); + dataJanitorState.saveRegionsForTime(time, regions); + } + + // Verify saved regions + Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0)); + Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); + Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31)); + Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)), + dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10)); + + // Now change the count stored for regions saved at time 0, 30 and 90 + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3); + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3); + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 90L), 0); + } + + // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20 + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Querying for anything higher than 90 should give 80 (reproduces TEPHRA-230) + Assert.assertEquals(new TimeRegions(80, regionsTime.get(80L)), + dataJanitorState.getRegionsOnOrBeforeTime(Long.MAX_VALUE)); + + // Delete regions saved on or before time 30 + dataJanitorState.deleteAllRegionsOnOrBeforeTime(30); + // Values on or before time 30 should be deleted + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Counts should be deleted for time on or before 30 + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30)); + Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0)); + } + // Values after time 30 should still exist + Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40)); + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40)); + } + } + + @Test + public void testSaveInactiveTransactionBoundTime() throws Exception { + int maxTime = 100; + + // Nothing should be present in the beginning + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); + + // Save inactive transaction bounds for various time values + for (long time = 0; time < maxTime; time += 10) { + dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2); + } + + // Verify written values + Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0)); + Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15)); + Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime + 100)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime + 55) * -1L)); + + // Delete values saved on or before time 20 + dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20); + // Values on or before time 20 should be deleted + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20)); + // Values after time 20 should still exist + Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30)); + Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); + } + + @Test + public void testSaveEmptyRegions() throws Exception { + // Nothing should be present in the beginning + Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + + byte[] region1 = Bytes.toBytes("region1"); + byte[] region2 = Bytes.toBytes("region2"); + byte[] region3 = Bytes.toBytes("region3"); + byte[] region4 = Bytes.toBytes("region4"); + SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4); + + // Now record some empty regions + dataJanitorState.saveEmptyRegionForTime(100, region1); + dataJanitorState.saveEmptyRegionForTime(110, region1); + dataJanitorState.saveEmptyRegionForTime(102, region2); + dataJanitorState.saveEmptyRegionForTime(112, region3); + + Assert.assertEquals(toISet(region1, region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + + Assert.assertEquals(toISet(region1, region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(100, allRegions)); + + Assert.assertEquals(toISet(region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3))); + + Assert.assertEquals(toISet(), + dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of())); + + Assert.assertEquals(toISet(region3), + dataJanitorState.getEmptyRegionsAfterTime(110, allRegions)); + + Assert.assertEquals(toISet(), + dataJanitorState.getEmptyRegionsAfterTime(112, allRegions)); + + // Delete empty regions on or before time 110 + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110); + // Now only region3 should remain + Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions)); + + // Delete empty regions on or before time 150 + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150); + // Now nothing should remain + Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + } + + private ImmutableSortedSet<byte[]> toISet(byte[]... args) { + ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR); + for (byte[] arg : args) { + builder.add(arg); + } + return builder.build(); + } +}
