http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java new file mode 100644 index 0000000..5d49ac1 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessorTest.java @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96.coprocessor; + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.regionserver.CompactionRequestor; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.Leases; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tephra.ChangeId; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.TransactionStateCache; +import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.util.TxUtils; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor. + */ +public class TransactionProcessorTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class); + + // 8 versions, 1 hour apart, latest is current ts. + private static final long[] V; + + static { + long now = System.currentTimeMillis(); + V = new long[9]; + for (int i = 0; i < V.length; i++) { + V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS; + } + } + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + private static MiniDFSCluster dfsCluster; + private static Configuration conf; + private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]}); + private static TransactionVisibilityState txVisibilityState; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration hConf = new Configuration(); + String rootDir = tmpFolder.newFolder().getAbsolutePath(); + hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir); + hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase"); + + dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); + dfsCluster.waitActive(); + conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf()); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + String localTestDir = tmpFolder.newFolder().getAbsolutePath(); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + + // write an initial transaction snapshot + TransactionSnapshot txSnapshot = + TransactionSnapshot.copyFrom( + System.currentTimeMillis(), V[6] - 1, V[7], invalidSet, + // this will set visibility upper bound to V[6] + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(V[6] - 1, Long.MAX_VALUE, + TransactionType.SHORT))), + new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>()); + txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), + txSnapshot.getWritePointer(), txSnapshot.getInvalid(), + txSnapshot.getInProgress()); + HDFSTransactionStateStorage tmpStorage = + new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + tmpStorage.startAndWait(); + tmpStorage.writeSnapshot(txSnapshot); + tmpStorage.stopAndWait(); + } + + @AfterClass + public static void shutdownAfterClass() throws Exception { + dfsCluster.shutdown(); + } + + @Test + public void testDataJanitorRegionScanner() throws Exception { + String tableName = "TestRegionScanner"; + byte[] familyBytes = Bytes.toBytes("f"); + byte[] columnBytes = Bytes.toBytes("c"); + HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3)); + try { + region.initialize(); + TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); + LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + + for (int i = 1; i <= 8; i++) { + for (int k = 1; k <= i; k++) { + Put p = new Put(Bytes.toBytes(i)); + p.add(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k])); + region.put(p); + } + } + + List<Cell> results = Lists.newArrayList(); + + // force a flush to clear the data + // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set + LOG.info("Flushing region " + region.getRegionNameAsString()); + region.flushcache(); + + // now a normal scan should only return the valid rows - testing that cleanup works on flush + Scan scan = new Scan(); + scan.setMaxVersions(10); + RegionScanner regionScanner = region.getScanner(scan); + + // first returned value should be "4" with version "4" + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 4, new long[] {V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 5, new long[] {V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 6, new long[] {V[6], V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 7, new long[] {V[6], V[4]}); + + results.clear(); + assertFalse(regionScanner.next(results)); + assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]}); + } finally { + region.close(); + } + } + + @Test + public void testDeleteFiltering() throws Exception { + String tableName = "TestDeleteFiltering"; + byte[] familyBytes = Bytes.toBytes("f"); + byte[] columnBytes = Bytes.toBytes("c"); + HRegion region = createRegion(tableName, familyBytes, 0); + try { + region.initialize(); + TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); + LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + + byte[] row = Bytes.toBytes(1); + for (int i = 4; i < V.length; i++) { + Put p = new Put(row); + p.add(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i])); + region.put(p); + } + + // delete from the third entry back + // take that cell's timestamp + 1 to simulate a delete in a new tx + long deleteTs = V[5] + 1; + Delete d = new Delete(row, deleteTs); + LOG.info("Issuing delete at timestamp " + deleteTs); + // row deletes are not yet supported (TransactionAwareHTable normally handles this) + d.deleteColumns(familyBytes, columnBytes, deleteTs); + region.delete(d); + + List<Cell> results = Lists.newArrayList(); + + // force a flush to clear the data + // during flush, we should drop the deleted version, but not the others + LOG.info("Flushing region " + region.getRegionNameAsString()); + region.flushcache(); + + // now a normal scan should return row with versions at: V[8], V[6]. + // V[7] is invalid and V[5] and prior are deleted. + Scan scan = new Scan(); + scan.setMaxVersions(10); + RegionScanner regionScanner = region.getScanner(scan); + // should be only one row + assertFalse(regionScanner.next(results)); + assertKeyValueMatches(results, 1, + new long[]{V[8], V[6], deleteTs}, + new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]}); + } finally { + region.close(); + } + } + + @Test + public void testDeleteMarkerCleanup() throws Exception { + String tableName = "TestDeleteMarkerCleanup"; + byte[] familyBytes = Bytes.toBytes("f"); + HRegion region = createRegion(tableName, familyBytes, 0); + try { + region.initialize(); + + // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal + long writeTs = txVisibilityState.getVisibilityUpperBound() - 10; + // deletes are performed after the writes, but still before the visibility upper bound + long deleteTs = writeTs + 1; + // write separate columns to confirm that delete markers survive across flushes + byte[] row = Bytes.toBytes(100); + Put p = new Put(row); + + LOG.info("Writing columns at timestamp " + writeTs); + for (int i = 0; i < 5; i++) { + byte[] iBytes = Bytes.toBytes(i); + p.add(familyBytes, iBytes, writeTs, iBytes); + } + region.put(p); + // read all back + Scan scan = new Scan(row); + RegionScanner regionScanner = region.getScanner(scan); + List<Cell> results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + for (int i = 0; i < 5; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + + // force a flush to clear the memstore + LOG.info("Before delete, flushing region " + region.getRegionNameAsString()); + region.flushcache(); + + // delete the odd entries + for (int i = 0; i < 5; i++) { + if (i % 2 == 1) { + // deletes are performed as puts with empty values + Put deletePut = new Put(row); + deletePut.add(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]); + region.put(deletePut); + } + } + + // read all back + scan = new Scan(row); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN)); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + LOG.info("Got cell " + cell); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + + // force another flush on the delete markers + // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction + LOG.info("After delete, flushing region " + region.getRegionNameAsString()); + region.flushcache(); + + scan = new Scan(row); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN)); + + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + + // force a major compaction + LOG.info("Forcing major compaction of region " + region.getRegionNameAsString()); + region.compactStores(true); + + // perform a raw scan (no filter) to confirm that the delete markers are now gone + scan = new Scan(row); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, cell.getRow()); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, cell.getQualifier()); + assertArrayEquals(idxBytes, cell.getValue()); + } + } finally { + region.close(); + } + } + + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.add(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.deleteFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List<Cell> results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], cell.getQualifier()); + assertArrayEquals(new byte[0], cell.getValue()); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, cell.getValue()); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap<byte[], Long>(), false, ScanType.USER_SCAN)); + + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + + @Test + public void testPreExistingData() throws Exception { + String tableName = "TestPreExistingData"; + byte[] familyBytes = Bytes.toBytes("f"); + long ttlMillis = TimeUnit.DAYS.toMillis(14); + HRegion region = createRegion(tableName, familyBytes, ttlMillis); + try { + region.initialize(); + + // timestamps for pre-existing, non-transactional data + long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS; + long older = now - ttlMillis / 2; + long newer = now - ttlMillis / 3; + // timestamps for transactional data + long nowTx = txVisibilityState.getVisibilityUpperBound(); + long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS; + long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS; + + Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + ttls.put(familyBytes, ttlMillis); + + List<Cell> cells = new ArrayList<>(); + cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11"))); + cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12"))); + cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21"))); + cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22"))); + cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31"))); + cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32"))); + + // Write non-transactional and transactional data + for (Cell c : cells) { + region.put(new Put(c.getRow()).add(c.getFamily(), c.getQualifier(), c.getTimestamp(), c.getValue())); + } + + Scan rawScan = new Scan(); + rawScan.setMaxVersions(); + + Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState); + Scan txScan = new Scan(); + txScan.setMaxVersions(); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // read all back with raw scanner + scanAndAssert(region, cells, rawScan); + + // read all back with transaction filter + scanAndAssert(region, cells, txScan); + + // force a flush to clear the memstore + region.flushcache(); + scanAndAssert(region, cells, txScan); + + // force a major compaction to remove any expired cells + region.compactStores(true); + scanAndAssert(region, cells, txScan); + + // Reduce TTL, this should make cells with timestamps older and olderTx expire + long newTtl = ttlMillis / 2 - 1; + region = updateTtl(region, familyBytes, newTtl); + ttls.put(familyBytes, newTtl); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // Raw scan should still give all cells + scanAndAssert(region, cells, rawScan); + // However, tx scan should not return expired cells + scanAndAssert(region, select(cells, 1, 3, 5), txScan); + + region.flushcache(); + scanAndAssert(region, cells, rawScan); + + // force a major compaction to remove any expired cells + region.compactStores(true); + // This time raw scan too should not return expired cells, as they would be dropped during major compaction + scanAndAssert(region, select(cells, 1, 3, 5), rawScan); + + // Reduce TTL again to 1 ms, this should expire all cells + newTtl = 1; + region = updateTtl(region, familyBytes, newTtl); + ttls.put(familyBytes, newTtl); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // force a major compaction to remove expired cells + region.compactStores(true); + // This time raw scan should not return any cells, as all cells have expired. + scanAndAssert(region, Collections.<Cell>emptyList(), rawScan); + } finally { + region.close(); + } + } + + private List<Cell> select(List<Cell> cells, int... indexes) { + List<Cell> newCells = new ArrayList<>(); + for (int i : indexes) { + newCells.add(cells.get(i)); + } + return newCells; + } + + @SuppressWarnings("StatementWithEmptyBody") + private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws Exception { + try (RegionScanner regionScanner = region.getScanner(scan)) { + List<Cell> results = Lists.newArrayList(); + while (regionScanner.next(results)) { } + assertEquals(expected, results); + } + } + + private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception { + region.close(); + HTableDescriptor htd = region.getTableDesc(); + HColumnDescriptor cfd = new HColumnDescriptor(family); + if (ttl > 0) { + cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl)); + } + cfd.setMaxVersions(10); + htd.addFamily(cfd); + return HRegion.openHRegion(region.getRegionInfo(), htd, region.getLog(), conf, + new MockRegionServerServices(conf, null), null); + } + + private HRegion createRegion(String tableName, byte[] family, long ttl) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + HColumnDescriptor cfd = new HColumnDescriptor(family); + if (ttl > 0) { + cfd.setValue(TxConstants.PROPERTY_TTL, String.valueOf(ttl)); + } + cfd.setMaxVersions(10); + htd.addFamily(cfd); + htd.addCoprocessor(TransactionProcessor.class.getName()); + Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), htd.getTableName()); + Path hlogPath = new Path(FSUtils.getRootDir(conf) + "/hlog"); + FileSystem fs = FileSystem.get(conf); + assertTrue(fs.mkdirs(tablePath)); + HLog hLog = HLogFactory.createHLog(fs, hlogPath, tableName, conf); + HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf(tableName)); + HRegionFileSystem regionFS = HRegionFileSystem.createRegionOnFileSystem(conf, fs, tablePath, regionInfo); + return new HRegion(regionFS, hLog, conf, htd, new MockRegionServerServices(conf, null)); + } + + private void assertKeyValueMatches(List<Cell> results, int index, long[] versions) { + byte[][] values = new byte[versions.length][]; + for (int i = 0; i < versions.length; i++) { + values[i] = Bytes.toBytes(versions[i]); + } + assertKeyValueMatches(results, index, versions, values); + } + + private void assertKeyValueMatches(List<Cell> results, int index, long[] versions, byte[][] values) { + assertEquals(versions.length, results.size()); + assertEquals(values.length, results.size()); + for (int i = 0; i < versions.length; i++) { + Cell kv = results.get(i); + assertArrayEquals(Bytes.toBytes(index), kv.getRow()); + assertEquals(versions[i], kv.getTimestamp()); + assertArrayEquals(values[i], kv.getValue()); + } + } + + @Test + public void testTransactionStateCache() throws Exception { + TransactionStateCache cache = new TransactionStateCache(); + cache.setConf(conf); + cache.startAndWait(); + // verify that the transaction snapshot read matches what we wrote in setupBeforeClass() + TransactionVisibilityState cachedSnapshot = cache.getLatestState(); + assertNotNull(cachedSnapshot); + assertEquals(invalidSet, cachedSnapshot.getInvalid()); + cache.stopAndWait(); + } + + private static class MockRegionServerServices implements RegionServerServices { + private final Configuration hConf; + private final ZooKeeperWatcher zookeeper; + private final Map<String, HRegion> regions = new HashMap<String, HRegion>(); + private boolean stopping = false; + private final ConcurrentSkipListMap<byte[], Boolean> rit = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + private HFileSystem hfs = null; + private ServerName serverName = null; + private RpcServerInterface rpcServer = null; + private volatile boolean abortRequested; + + + public MockRegionServerServices(Configuration hConf, ZooKeeperWatcher zookeeper) { + this.hConf = hConf; + this.zookeeper = zookeeper; + } + + @Override + public boolean isStopping() { + return stopping; + } + + @Override + public HLog getWAL(HRegionInfo regionInfo) throws IOException { + return null; + } + + @Override + public CompactionRequestor getCompactionRequester() { + return null; + } + + @Override + public FlushRequester getFlushRequester() { + return null; + } + + @Override + public RegionServerAccounting getRegionServerAccounting() { + return null; + } + + @Override + public TableLockManager getTableLockManager() { + return new TableLockManager.NullTableLockManager(); + } + + @Override + public void postOpenDeployTasks(HRegion r, CatalogTracker ct) throws KeeperException, IOException { + } + + @Override + public RpcServerInterface getRpcServer() { + return rpcServer; + } + + @Override + public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() { + return rit; + } + + @Override + public FileSystem getFileSystem() { + return hfs; + } + + @Override + public Leases getLeases() { + return null; + } + + @Override + public ExecutorService getExecutorService() { + return null; + } + + @Override + public CatalogTracker getCatalogTracker() { + return null; + } + + @Override + public Map<String, HRegion> getRecoveringRegions() { + return null; + } + + @Override + public void updateRegionFavoredNodesMapping(String encodedRegionName, List<HBaseProtos.ServerName> favoredNodes) { + } + + @Override + public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { + return new InetSocketAddress[0]; + } + + @Override + public void addToOnlineRegions(HRegion r) { + regions.put(r.getRegionNameAsString(), r); + } + + @Override + public boolean removeFromOnlineRegions(HRegion r, ServerName destination) { + return regions.remove(r.getRegionInfo().getEncodedName()) != null; + } + + @Override + public HRegion getFromOnlineRegions(String encodedRegionName) { + return regions.get(encodedRegionName); + } + + @Override + public List<HRegion> getOnlineRegions(TableName tableName) throws IOException { + return null; + } + + @Override + public Configuration getConfiguration() { + return hConf; + } + + @Override + public ZooKeeperWatcher getZooKeeper() { + return zookeeper; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public void abort(String why, Throwable e) { + this.abortRequested = true; + } + + @Override + public boolean isAborted() { + return abortRequested; + } + + @Override + public void stop(String why) { + this.stopping = true; + } + + @Override + public boolean isStopped() { + return stopping; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java new file mode 100644 index 0000000..678ba5a --- /dev/null +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilterTest.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96.coprocessor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractTransactionVisibilityFilterTest; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * HBase 0.96 specific test for filtering logic applied when reading data transactionally. + */ +public class TransactionVisibilityFilterTest extends AbstractTransactionVisibilityFilterTest { + /** + * Test filtering of KeyValues for in-progress and invalid transactions. + * @throws Exception + */ + @Test + public void testFiltering() throws Exception { + TxFilterFactory txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + } + + @Test + public void testSubFilter() throws Exception { + final FilterBase includeFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE; + } + }; + TxFilterFactory txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + + final Filter skipFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.SKIP; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + final Filter includeNextFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + + final Filter nextColFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + } + + @Test + public void testSubFilterOverride() throws Exception { + final FilterBase includeFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE; + } + }; + TxFilterFactory txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE, + Filter.ReturnCode.INCLUDE, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE, + Filter.ReturnCode.INCLUDE)); + + final Filter skipFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.SKIP; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + final Filter includeNextFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.INCLUDE_AND_NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + Filter.ReturnCode.INCLUDE_AND_NEXT_COL)); + + final Filter nextColFilter = new FilterBase() { + @Override + public ReturnCode filterKeyValue(Cell ignored) throws IOException { + return ReturnCode.NEXT_COL; + } + }; + txFilterFactory = new TxFilterFactory() { + @Override + public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) { + return new CustomTxFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + } + }; + runFilteringTest(txFilterFactory, + ImmutableList.of(Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.SKIP, + Filter.ReturnCode.NEXT_COL, + Filter.ReturnCode.NEXT_COL)); + + } + + private void runFilteringTest(TxFilterFactory txFilterFactory, + List<Filter.ReturnCode> assertCodes) throws Exception { + + /* + * Start and stop some transactions. This will give us a transaction state something like the following + * (numbers only reflect ordering, not actual transaction IDs): + * 6 - in progress + * 5 - committed + * 4 - invalid + * 3 - in-progress + * 2 - committed + * 1 - committed + * + * read ptr = 5 + * write ptr = 6 + */ + + Transaction tx1 = txManager.startShort(); + assertTrue(txManager.canCommit(tx1, EMPTY_CHANGESET)); + assertTrue(txManager.commit(tx1)); + + Transaction tx2 = txManager.startShort(); + assertTrue(txManager.canCommit(tx2, EMPTY_CHANGESET)); + assertTrue(txManager.commit(tx2)); + + Transaction tx3 = txManager.startShort(); + Transaction tx4 = txManager.startShort(); + txManager.invalidate(tx4.getTransactionId()); + + Transaction tx5 = txManager.startShort(); + assertTrue(txManager.canCommit(tx5, EMPTY_CHANGESET)); + assertTrue(txManager.commit(tx5)); + + Transaction tx6 = txManager.startShort(); + + Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + Filter filter = txFilterFactory.getTxFilter(tx6, ttls); + + assertEquals(assertCodes.get(5), + filter.filterKeyValue(newKeyValue("row1", "val1", tx6.getTransactionId()))); + assertEquals(assertCodes.get(4), + filter.filterKeyValue(newKeyValue("row1", "val1", tx5.getTransactionId()))); + assertEquals(assertCodes.get(3), + filter.filterKeyValue(newKeyValue("row1", "val1", tx4.getTransactionId()))); + assertEquals(assertCodes.get(2), + filter.filterKeyValue(newKeyValue("row1", "val1", tx3.getTransactionId()))); + assertEquals(assertCodes.get(1), + filter.filterKeyValue(newKeyValue("row1", "val1", tx2.getTransactionId()))); + assertEquals(assertCodes.get(0), + filter.filterKeyValue(newKeyValue("row1", "val1", tx1.getTransactionId()))); + } + + /** + * Test filtering for TTL settings. + * @throws Exception + */ + @Test + public void testTTLFiltering() throws Exception { + Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + ttls.put(FAM, 10L); + ttls.put(FAM2, 30L); + ttls.put(FAM3, 0L); + + Transaction tx = txManager.startShort(); + long now = tx.getVisibilityUpperBound(); + Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now - 11 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 11 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 21 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM2, "val1", now - 31 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 31 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", now - 1001 * TxConstants.MAX_TX_PER_MS))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row2", FAM, "val1", now - 1 * TxConstants.MAX_TX_PER_MS))); + + // Verify ttl for pre-existing, non-transactional data + long preNow = now / TxConstants.MAX_TX_PER_MS; + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 9L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 10L))); + assertEquals(Filter.ReturnCode.NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM, "val1", preNow - 11L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 9L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 10L))); + assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, + filter.filterKeyValue(newKeyValue("row1", FAM3, "val1", preNow - 1001L))); + } + + protected KeyValue newKeyValue(String rowkey, String value, long timestamp) { + return new KeyValue(Bytes.toBytes(rowkey), FAM, COL, timestamp, Bytes.toBytes(value)); + } + + protected KeyValue newKeyValue(String rowkey, byte[] family, String value, long timestamp) { + return new KeyValue(Bytes.toBytes(rowkey), family, COL, timestamp, Bytes.toBytes(value)); + } + + private interface TxFilterFactory { + Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs); + } + + private class CustomTxFilter extends TransactionVisibilityFilter { + public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType, + @Nullable Filter cellFilter) { + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + } + + @Override + protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) { + switch (subFilterCode) { + case INCLUDE: + return ReturnCode.INCLUDE; + case INCLUDE_AND_NEXT_COL: + return ReturnCode.INCLUDE_AND_NEXT_COL; + case SKIP: + return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL; + default: + return subFilterCode; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/HBase98ConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/HBase98ConfigurationProvider.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/HBase98ConfigurationProvider.java deleted file mode 100644 index 7fb068b..0000000 --- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/HBase98ConfigurationProvider.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase98; - -import co.cask.tephra.util.ConfigurationProvider; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; - -/** - * HBase 0.98 version of {@link co.cask.tephra.util.ConfigurationProvider}. - */ -public class HBase98ConfigurationProvider extends ConfigurationProvider { - @Override - public Configuration get() { - return HBaseConfiguration.create(); - } - - @Override - public Configuration get(Configuration baseConf) { - return HBaseConfiguration.create(baseConf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/SecondaryIndexTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/SecondaryIndexTable.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/SecondaryIndexTable.java deleted file mode 100644 index 1dd4df8..0000000 --- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/SecondaryIndexTable.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.hbase98; - -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.distributed.TransactionServiceClient; -import com.google.common.base.Throwables; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * A Transactional SecondaryIndexTable. - */ -public class SecondaryIndexTable { - private byte[] secondaryIndex; - private TransactionAwareHTable transactionAwareHTable; - private TransactionAwareHTable secondaryIndexTable; - private TransactionContext transactionContext; - private final TableName secondaryIndexTableName; - private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily"); - private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r'); - private static final byte[] DELIMITER = new byte[] {0}; - - public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable, - byte[] secondaryIndex) { - secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx"); - HTable secondaryIndexHTable = null; - HBaseAdmin hBaseAdmin = null; - try { - hBaseAdmin = new HBaseAdmin(hTable.getConfiguration()); - if (!hBaseAdmin.tableExists(secondaryIndexTableName)) { - hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName)); - } - secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName); - } catch (Exception e) { - Throwables.propagate(e); - } finally { - try { - hBaseAdmin.close(); - } catch (Exception e) { - Throwables.propagate(e); - } - } - - this.secondaryIndex = secondaryIndex; - this.transactionAwareHTable = new TransactionAwareHTable(hTable); - this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable); - this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable, - secondaryIndexTable); - } - - public Result get(Get get) throws IOException { - return get(Collections.singletonList(get))[0]; - } - - public Result[] get(List<Get> gets) throws IOException { - try { - transactionContext.start(); - Result[] result = transactionAwareHTable.get(gets); - transactionContext.finish(); - return result; - } catch (Exception e) { - try { - transactionContext.abort(); - } catch (TransactionFailureException e1) { - throw new IOException("Could not rollback transaction", e1); - } - } - return null; - } - - public Result[] getByIndex(byte[] value) throws IOException { - try { - transactionContext.start(); - Scan scan = new Scan(value, Bytes.add(value, new byte[0])); - scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier); - ResultScanner indexScanner = secondaryIndexTable.getScanner(scan); - - ArrayList<Get> gets = new ArrayList<Get>(); - for (Result result : indexScanner) { - for (Cell cell : result.listCells()) { - gets.add(new Get(cell.getValue())); - } - } - Result[] results = transactionAwareHTable.get(gets); - transactionContext.finish(); - return results; - } catch (Exception e) { - try { - transactionContext.abort(); - } catch (TransactionFailureException e1) { - throw new IOException("Could not rollback transaction", e1); - } - } - return null; - } - - public void put(Put put) throws IOException { - put(Collections.singletonList(put)); - } - - - public void put(List<Put> puts) throws IOException { - try { - transactionContext.start(); - ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>(); - for (Put put : puts) { - List<Put> indexPuts = new ArrayList<Put>(); - Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet(); - for (Map.Entry<byte [], List<KeyValue>> family : familyMap) { - for (KeyValue value : family.getValue()) { - if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), - secondaryIndex, 0, secondaryIndex.length)) { - byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER, - Bytes.add(value.getValue(), DELIMITER, - value.getRow())); - Put indexPut = new Put(secondaryRow); - indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow()); - indexPuts.add(indexPut); - } - } - } - secondaryIndexPuts.addAll(indexPuts); - } - transactionAwareHTable.put(puts); - secondaryIndexTable.put(secondaryIndexPuts); - transactionContext.finish(); - } catch (Exception e) { - try { - transactionContext.abort(); - } catch (TransactionFailureException e1) { - throw new IOException("Could not rollback transaction", e1); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java deleted file mode 100644 index b9087ab..0000000 --- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/TransactionAwareHTable.java +++ /dev/null @@ -1,642 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.hbase98; - -import co.cask.tephra.AbstractTransactionAwareTable; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TxConstants; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.OperationWithAttributes; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; - -/** - * A Transaction Aware HTable implementation for HBase 0.98. Operations are committed as usual, - * but upon a failed or aborted transaction, they are rolled back to the state before the transaction - * was started. - */ -public class TransactionAwareHTable extends AbstractTransactionAwareTable - implements HTableInterface, TransactionAware { - - private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class); - private final HTableInterface hTable; - - /** - * Create a transactional aware instance of the passed HTable - * - * @param hTable underlying HBase table to use - */ - public TransactionAwareHTable(HTableInterface hTable) { - this(hTable, false); - } - - /** - * Create a transactional aware instance of the passed HTable - * - * @param hTable underlying HBase table to use - * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN}) - */ - public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) { - this(hTable, conflictLevel, false); - } - - /** - * Create a transactional aware instance of the passed HTable, with the option - * of allowing non-transactional operations. - * @param hTable underlying HBase table to use - * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete) - * will be available, though non-transactional - */ - public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) { - this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional); - } - - /** - * Create a transactional aware instance of the passed HTable, with the option - * of allowing non-transactional operations. - * @param hTable underlying HBase table to use - * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN}) - * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete) - * will be available, though non-transactional - */ - public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel, - boolean allowNonTransactional) { - super(conflictLevel, allowNonTransactional); - this.hTable = hTable; - } - - /* AbstractTransactionAwareTable implementation */ - - @Override - protected byte[] getTableKey() { - return getTableName(); - } - - @Override - protected boolean doCommit() throws IOException { - hTable.flushCommits(); - return true; - } - - @Override - protected boolean doRollback() throws Exception { - try { - // pre-size arraylist of deletes - int size = 0; - for (Set<ActionChange> cs : changeSets.values()) { - size += cs.size(); - } - List<Delete> rollbackDeletes = new ArrayList<>(size); - for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) { - long transactionTimestamp = entry.getKey(); - for (ActionChange change : entry.getValue()) { - byte[] row = change.getRow(); - byte[] family = change.getFamily(); - byte[] qualifier = change.getQualifier(); - Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); - switch (conflictLevel) { - case ROW: - case NONE: - // issue family delete for the tx write pointer - rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp); - break; - case COLUMN: - if (family != null && qualifier == null) { - rollbackDelete.deleteFamilyVersion(family, transactionTimestamp); - } else if (family != null && qualifier != null) { - rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp); - } - break; - default: - throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); - } - rollbackDeletes.add(rollbackDelete); - } - } - hTable.delete(rollbackDeletes); - return true; - } finally { - try { - hTable.flushCommits(); - } catch (Exception e) { - LOG.error("Could not flush HTable commits", e); - } - tx = null; - changeSets.clear(); - } - } - - /* HTableInterface implementation */ - - @Override - public byte[] getTableName() { - return hTable.getTableName(); - } - - @Override - public TableName getName() { - return hTable.getName(); - } - - @Override - public Configuration getConfiguration() { - return hTable.getConfiguration(); - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - return hTable.getTableDescriptor(); - } - - @Override - public boolean exists(Get get) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.exists(transactionalizeAction(get)); - } - - @Override - public Boolean[] exists(List<Get> gets) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - List<Get> transactionalizedGets = new ArrayList<Get>(gets.size()); - for (Get get : gets) { - transactionalizedGets.add(transactionalizeAction(get)); - } - return hTable.exists(transactionalizedGets); - } - - @Override - public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - hTable.batch(transactionalizeActions(actions), results); - } - - @Override - public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.batch(transactionalizeActions(actions)); - } - - @Override - public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws - IOException, InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - hTable.batchCallback(transactionalizeActions(actions), results, callback); - } - - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, - InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.batchCallback(transactionalizeActions(actions), callback); - } - - @Override - public Result get(Get get) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.get(transactionalizeAction(get)); - } - - @Override - public Result[] get(List<Get> gets) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - ArrayList<Get> transactionalizedGets = new ArrayList<Get>(); - for (Get get : gets) { - transactionalizedGets.add(transactionalizeAction(get)); - } - return hTable.get(transactionalizedGets); - } - - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - if (allowNonTransactional) { - return hTable.getRowOrBefore(row, family); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.getScanner(transactionalizeAction(scan)); - } - - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - Scan scan = new Scan(); - scan.addFamily(family); - return hTable.getScanner(transactionalizeAction(scan)); - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - Scan scan = new Scan(); - scan.addColumn(family, qualifier); - return hTable.getScanner(transactionalizeAction(scan)); - } - - @Override - public void put(Put put) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - Put txPut = transactionalizeAction(put); - hTable.put(txPut); - } - - @Override - public void put(List<Put> puts) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - List<Put> transactionalizedPuts = new ArrayList<Put>(puts.size()); - for (Put put : puts) { - Put txPut = transactionalizeAction(put); - transactionalizedPuts.add(txPut); - } - hTable.put(transactionalizedPuts); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { - if (allowNonTransactional) { - return hTable.checkAndPut(row, family, qualifier, value, put); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public void delete(Delete delete) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - hTable.delete(transactionalizeAction(delete)); - } - - @Override - public void delete(List<Delete> deletes) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - List<Delete> transactionalizedDeletes = new ArrayList<Delete>(deletes.size()); - for (Delete delete : deletes) { - Delete txDelete = transactionalizeAction(delete); - transactionalizedDeletes.add(txDelete); - } - hTable.delete(transactionalizedDeletes); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) - throws IOException { - if (allowNonTransactional) { - return hTable.checkAndDelete(row, family, qualifier, value, delete); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, RowMutations rowMutations) - throws IOException { - if (allowNonTransactional) { - return hTable.checkAndMutate(row, family, qualifier, compareOp, value, rowMutations); - } - - throw new UnsupportedOperationException("checkAndMutate operation is not supported transactionally"); - } - - @Override - public void mutateRow(RowMutations rm) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - RowMutations transactionalMutations = new RowMutations(); - for (Mutation mutation : rm.getMutations()) { - if (mutation instanceof Put) { - transactionalMutations.add(transactionalizeAction((Put) mutation)); - } else if (mutation instanceof Delete) { - transactionalMutations.add(transactionalizeAction((Delete) mutation)); - } - } - hTable.mutateRow(transactionalMutations); - } - - @Override - public Result append(Append append) throws IOException { - if (allowNonTransactional) { - return hTable.append(append); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public Result increment(Increment increment) throws IOException { - if (allowNonTransactional) { - return hTable.increment(increment); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { - if (allowNonTransactional) { - return hTable.incrementColumnValue(row, family, qualifier, amount); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) - throws IOException { - if (allowNonTransactional) { - return hTable.incrementColumnValue(row, family, qualifier, amount, durability); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - if (allowNonTransactional) { - return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public boolean isAutoFlush() { - return hTable.isAutoFlush(); - } - - @Override - public void flushCommits() throws IOException { - hTable.flushCommits(); - } - - @Override - public void close() throws IOException { - hTable.close(); - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - return hTable.coprocessorService(row); - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, - Batch.Call<T, R> callable) - throws ServiceException, Throwable { - return hTable.coprocessorService(service, startKey, endKey, callable); - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, - Batch.Call<T, R> callable, Batch.Callback<R> callback) - throws ServiceException, Throwable { - hTable.coprocessorService(service, startKey, endKey, callable, callback); - } - - @Override - public <R extends Message> Map<byte[], R> batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, - R responsePrototype) throws ServiceException, Throwable { - return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); - } - - @Override - public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, - Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) - throws ServiceException, Throwable { - hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); - } - - @Override - public void setAutoFlush(boolean autoFlush) { - setAutoFlushTo(autoFlush); - } - - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - hTable.setAutoFlush(autoFlush, clearBufferOnFail); - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - hTable.setAutoFlushTo(autoFlush); - } - - @Override - public long getWriteBufferSize() { - return hTable.getWriteBufferSize(); - } - - @Override - public void setWriteBufferSize(long writeBufferSize) throws IOException { - hTable.setWriteBufferSize(writeBufferSize); - } - - // Helpers to get copies of objects with the timestamp set to the current transaction timestamp. - - private Get transactionalizeAction(Get get) throws IOException { - addToOperation(get, tx); - return get; - } - - private Scan transactionalizeAction(Scan scan) throws IOException { - addToOperation(scan, tx); - return scan; - } - - private Put transactionalizeAction(Put put) throws IOException { - Put txPut = new Put(put.getRow(), tx.getWritePointer()); - Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet(); - if (!familyMap.isEmpty()) { - for (Map.Entry<byte[], List<Cell>> family : familyMap) { - List<Cell> familyValues = family.getValue(); - if (!familyValues.isEmpty()) { - for (Cell value : familyValues) { - txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue()); - addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier()); - } - } - } - } - for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) { - txPut.setAttribute(entry.getKey(), entry.getValue()); - } - txPut.setDurability(put.getDurability()); - addToOperation(txPut, tx); - return txPut; - } - - private Delete transactionalizeAction(Delete delete) throws IOException { - long transactionTimestamp = tx.getWritePointer(); - - byte[] deleteRow = delete.getRow(); - Delete txDelete = new Delete(deleteRow, transactionTimestamp); - - Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap(); - if (familyToDelete.isEmpty()) { - // perform a row delete if we are using row-level conflict detection - if (conflictLevel == TxConstants.ConflictDetection.ROW || - conflictLevel == TxConstants.ConflictDetection.NONE) { - // Row delete leaves delete markers in all column families of the table - // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet - for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) { - // no need to identify individual columns deleted - addToChangeSet(deleteRow, columnDescriptor.getName(), null); - } - } else { - Result result = get(new Get(delete.getRow())); - // Delete everything - NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap(); - for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) { - NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey()); - for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) { - txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp); - addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey()); - } - } - } - } else { - for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) { - byte[] family = familyEntry.getKey(); - List<Cell> entries = familyEntry.getValue(); - boolean isFamilyDelete = false; - if (entries.size() == 1) { - Cell cell = entries.get(0); - isFamilyDelete = CellUtil.isDeleteFamily(cell); - } - if (isFamilyDelete) { - if (conflictLevel == TxConstants.ConflictDetection.ROW || - conflictLevel == TxConstants.ConflictDetection.NONE) { - // no need to identify individual columns deleted - txDelete.deleteFamily(family); - addToChangeSet(deleteRow, family, null); - } else { - Result result = get(new Get(delete.getRow()).addFamily(family)); - // Delete entire family - NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family); - for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) { - txDelete.deleteColumns(family, column.getKey(), transactionTimestamp); - addToChangeSet(deleteRow, family, column.getKey()); - } - } - } else { - for (Cell value : entries) { - txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp); - addToChangeSet(deleteRow, value.getFamily(), value.getQualifier()); - } - } - } - } - for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) { - txDelete.setAttribute(entry.getKey(), entry.getValue()); - } - txDelete.setDurability(delete.getDurability()); - return txDelete; - } - - private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException { - List<Row> transactionalizedActions = new ArrayList<>(actions.size()); - for (Row action : actions) { - if (action instanceof Get) { - transactionalizedActions.add(transactionalizeAction((Get) action)); - } else if (action instanceof Put) { - transactionalizedActions.add(transactionalizeAction((Put) action)); - } else if (action instanceof Delete) { - transactionalizedActions.add(transactionalizeAction((Delete) action)); - } else { - transactionalizedActions.add(action); - } - } - return transactionalizedActions; - } - - public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { - op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/CellSkipFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/CellSkipFilter.java deleted file mode 100644 index dcf29d6..0000000 --- a/tephra-hbase-compat-0.98/src/main/java/co/cask/tephra/hbase98/coprocessor/CellSkipFilter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase98.coprocessor; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; - -import java.io.IOException; -import java.util.List; - -/** - * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue} - * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL}, - * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different - * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}. - * Please see TEPHRA-169 for more details. - */ -public class CellSkipFilter extends FilterBase { - private final Filter filter; - // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL - private KeyValue skipColumn = null; - - public CellSkipFilter(Filter filter) { - this.filter = filter; - } - - /** - * Determines whether the current cell should be skipped. The cell will be skipped - * if the previous keyvalue had the same key as the current cell. This means filter already responded - * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL. - * @param cell the {@link Cell} to be tested for skipping - * @return true is current cell should be skipped, false otherwise - */ - private boolean skipCellVersion(Cell cell) { - return skipColumn != null - && skipColumn.matchingRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) - && skipColumn.matchingColumn(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - } - - @Override - public ReturnCode filterKeyValue(Cell cell) throws IOException { - if (skipCellVersion(cell)) { - return ReturnCode.NEXT_COL; - } - - ReturnCode code = filter.filterKeyValue(cell); - if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) { - // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL - skipColumn = KeyValue.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength()); - } else { - skipColumn = null; - } - return code; - } - - @Override - public boolean filterRow() throws IOException { - return filter.filterRow(); - } - - @Override - public Cell transformCell(Cell cell) throws IOException { - return filter.transformCell(cell); - } - - @Override - public void reset() throws IOException { - filter.reset(); - } - - @Override - public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { - return filter.filterRowKey(buffer, offset, length); - } - - @Override - public boolean filterAllRemaining() throws IOException { - return filter.filterAllRemaining(); - } - - @Override - public void filterRowCells(List<Cell> kvs) throws IOException { - filter.filterRowCells(kvs); - } - - @Override - public boolean hasFilterRow() { - return filter.hasFilterRow(); - } - - @SuppressWarnings("deprecation") - @Override - public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { - return filter.getNextKeyHint(currentKV); - } - - @Override - public Cell getNextCellHint(Cell currentKV) throws IOException { - return filter.getNextCellHint(currentKV); - } - - @Override - public boolean isFamilyEssential(byte[] name) throws IOException { - return filter.isFamilyEssential(name); - } - - @Override - public byte[] toByteArray() throws IOException { - return filter.toByteArray(); - } -} -
