Repository: incubator-tephra Updated Branches: refs/heads/master d3c0b6157 -> 6edfa091c
TEPHRA-208 Compaction removes rows incorrectly for family deletes with column level conflict detection This closes #26 from GitHub. Signed-off-by: Thomas D'Silva <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/6edfa091 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/6edfa091 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/6edfa091 Branch: refs/heads/master Commit: 6edfa091cbda59a8cee2613d8456b5a7ccad38a2 Parents: d3c0b61 Author: Thomas D'Silva <[email protected]> Authored: Mon Jan 9 15:02:00 2017 -0800 Committer: Thomas D'Silva <[email protected]> Committed: Thu Feb 2 12:58:38 2017 -0800 ---------------------------------------------------------------------- .../TransactionVisibilityFilter.java | 6 +- .../hbase/TransactionAwareHTableTest.java | 119 +++++++++++++++++-- .../TransactionVisibilityFilter.java | 6 +- .../hbase/TransactionAwareHTableTest.java | 110 ++++++++++++++++- .../TransactionVisibilityFilter.java | 9 +- .../hbase/TransactionAwareHTableTest.java | 110 ++++++++++++++++- .../TransactionVisibilityFilter.java | 9 +- .../hbase/TransactionAwareHTableTest.java | 114 ++++++++++++++++-- .../TransactionVisibilityFilter.java | 7 +- .../hbase/TransactionAwareHTableTest.java | 116 +++++++++++++++++- 10 files changed, 572 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index a986d1c..3675268 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -291,6 +291,7 @@ public class TransactionVisibilityFilter extends FilterBase { private static final class DeleteTracker { private long familyDeleteTs; + private byte[] rowKey; public static boolean isFamilyDelete(Cell cell) { return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && @@ -300,14 +301,17 @@ public class TransactionVisibilityFilter extends FilterBase { public void addFamilyDelete(Cell delete) { this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); } public boolean isDeleted(Cell cell) { - return cell.getTimestamp() <= familyDeleteTs; + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; } public void reset() { this.familyDeleteTs = 0; + this.rowKey = null; } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index afd7c01..9c5dca2 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -20,14 +20,18 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -43,23 +47,29 @@ import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; - import org.apache.tephra.hbase.coprocessor.TransactionProcessor; - import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.TxConstants.ConflictDetection; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +80,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; - import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -92,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; - + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniDFSCluster dfsCluster; + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); @@ -136,18 +151,33 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + //override AbstractHBaseTableTest.startMiniCluster to setup configuration @BeforeClass - public static void setupBeforeClass() throws Exception { - txStateStorage = new InMemoryTransactionStateStorage(); - txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); - txManager.startAndWait(); - } + public static void startMiniCluster() throws Exception { + // setup the configuration to use HDFSTransactionStateStorage + conf = HBaseConfiguration.create(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5); + + AbstractHBaseTableTest.startMiniCluster(); + + txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); + txManager.startAndWait(); + } @AfterClass public static void shutdownAfterClass() throws Exception { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before @@ -243,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { result = txTable.get(new Get(TestBytes.row)); txContext.finish(); assertTrue(result.isEmpty()); - + // test column delete // load 10 rows txContext.start(); @@ -395,6 +425,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + @Test + public void testFamilyDeleteWithCompaction() throws Exception { + HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"), + new byte[][]{TestBytes.family, TestBytes.family2}); + try { + TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + + txContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + + put = new Put(TestBytes.row2); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + txContext.finish(); + + txContext.start(); + Result result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertFalse(result.isEmpty()); + + txContext.start(); + // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete) + Delete delete = new Delete(TestBytes.row); + delete.deleteFamily(TestBytes.family); + txTable.delete(delete); + txContext.finish(); + + txContext.start(); + result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertTrue(result.isEmpty()); + + boolean compactionDone = false; + int count = 0; + while (count++ < 12 && !compactionDone) { + // run major compaction and verify the row was removed + HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin(); + hbaseAdmin.flush("TestFamilyDeleteWithCompaction"); + hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction"); + hbaseAdmin.close(); + Thread.sleep(5000L); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row); + scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + compactionDone = scanner.next() == null; + scanner.close(); + } + assertTrue("Compaction should have removed the row", compactionDone); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row2); + scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + } finally { + hTable.close(); + } + } + /** * Test aborted transactional delete requests, that must be rolled back. * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index a3b902a..9a617a9 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -291,6 +291,7 @@ public class TransactionVisibilityFilter extends FilterBase { private static final class DeleteTracker { private long familyDeleteTs; + private byte[] rowKey; public static boolean isFamilyDelete(Cell cell) { return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && @@ -300,14 +301,17 @@ public class TransactionVisibilityFilter extends FilterBase { public void addFamilyDelete(Cell delete) { this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); } public boolean isDeleted(Cell cell) { - return cell.getTimestamp() <= familyDeleteTs; + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; } public void reset() { this.familyDeleteTs = 0; + this.rowKey = null; } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 2922cf3..68324c9 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -19,14 +19,19 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -42,23 +47,29 @@ import org.apache.hadoop.hbase.filter.LongComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.tephra.TxConstants.ConflictDetection; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; - + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniDFSCluster dfsCluster; + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); @@ -135,9 +151,21 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } - @BeforeClass - public static void setupBeforeClass() throws Exception { - txStateStorage = new InMemoryTransactionStateStorage(); + // override AbstractHBaseTableTest.startMiniCluster to setup configuration + @BeforeClass + public static void startMiniCluster() throws Exception { + // setup the configuration to use HDFSTransactionStateStorage + conf = HBaseConfiguration.create(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5); + + AbstractHBaseTableTest.startMiniCluster(); + + txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); txManager.startAndWait(); } @@ -147,6 +175,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before @@ -242,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { result = txTable.get(new Get(TestBytes.row)); txContext.finish(); assertTrue(result.isEmpty()); - + // test column delete // load 10 rows txContext.start(); @@ -347,6 +378,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + @Test + public void testFamilyDeleteWithCompaction() throws Exception { + HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"), + new byte[][]{TestBytes.family, TestBytes.family2}); + try { + TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + + txContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + + put = new Put(TestBytes.row2); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + txContext.finish(); + + txContext.start(); + Result result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertFalse(result.isEmpty()); + + txContext.start(); + // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete) + Delete delete = new Delete(TestBytes.row); + delete.deleteFamily(TestBytes.family); + txTable.delete(delete); + txContext.finish(); + + txContext.start(); + result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertTrue(result.isEmpty()); + + boolean compactionDone = false; + int count = 0; + while (count++ < 12 && !compactionDone) { + // run major compaction and verify the row was removed + HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin(); + hbaseAdmin.flush("TestFamilyDeleteWithCompaction"); + hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction"); + hbaseAdmin.close(); + Thread.sleep(5000L); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row); + scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + compactionDone = scanner.next() == null; + scanner.close(); + } + assertTrue("Compaction should have removed the row", compactionDone); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row2); + scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + } finally { + hTable.close(); + } + } + /** * Test that put and delete attributes are preserved * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 541e019..9825ded 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -19,6 +19,7 @@ package org.apache.tephra.hbase.coprocessor; import com.google.common.collect.Maps; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TxConstants; import org.apache.tephra.util.TxUtils; @@ -34,6 +36,7 @@ import org.apache.tephra.util.TxUtils; import java.io.IOException; import java.util.List; import java.util.Map; + import javax.annotation.Nullable; /** @@ -291,6 +294,7 @@ public class TransactionVisibilityFilter extends FilterBase { private static final class DeleteTracker { private long familyDeleteTs; + private byte[] rowKey; public static boolean isFamilyDelete(Cell cell) { return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && @@ -300,14 +304,17 @@ public class TransactionVisibilityFilter extends FilterBase { public void addFamilyDelete(Cell delete) { this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); } public boolean isDeleted(Cell cell) { - return cell.getTimestamp() <= familyDeleteTs; + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; } public void reset() { this.familyDeleteTs = 0; + this.rowKey = null; } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 0dc483d..defad80 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -19,14 +19,19 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -42,23 +47,29 @@ import org.apache.hadoop.hbase.filter.LongComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.tephra.TxConstants.ConflictDetection; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; - + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniDFSCluster dfsCluster; + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); @@ -135,9 +151,21 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } - @BeforeClass - public static void setupBeforeClass() throws Exception { - txStateStorage = new InMemoryTransactionStateStorage(); + //override AbstractHBaseTableTest.startMiniCluster to setup configuration + @BeforeClass + public static void startMiniCluster() throws Exception { + // setup the configuration to use HDFSTransactionStateStorage + conf = HBaseConfiguration.create(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5); + + AbstractHBaseTableTest.startMiniCluster(); + + txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); txManager.startAndWait(); } @@ -147,6 +175,9 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before @@ -242,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { result = txTable.get(new Get(TestBytes.row)); txContext.finish(); assertTrue(result.isEmpty()); - + // test column delete // load 10 rows txContext.start(); @@ -394,6 +425,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + @Test + public void testFamilyDeleteWithCompaction() throws Exception { + HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"), + new byte[][]{TestBytes.family, TestBytes.family2}); + try { + TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + + txContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + + put = new Put(TestBytes.row2); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + txContext.finish(); + + txContext.start(); + Result result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertFalse(result.isEmpty()); + + txContext.start(); + // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete) + Delete delete = new Delete(TestBytes.row); + delete.deleteFamily(TestBytes.family); + txTable.delete(delete); + txContext.finish(); + + txContext.start(); + result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertTrue(result.isEmpty()); + + boolean compactionDone = false; + int count = 0; + while (count++ < 12 && !compactionDone) { + // run major compaction and verify the row was removed + HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin(); + hbaseAdmin.flush("TestFamilyDeleteWithCompaction"); + hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction"); + hbaseAdmin.close(); + Thread.sleep(5000L); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row); + scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + compactionDone = scanner.next() == null; + scanner.close(); + } + assertTrue("Compaction should have removed the row", compactionDone); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row2); + scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + } finally { + hTable.close(); + } + } + /** * Test aborted transactional delete requests, that must be rolled back. * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 541e019..9825ded 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -19,6 +19,7 @@ package org.apache.tephra.hbase.coprocessor; import com.google.common.collect.Maps; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TxConstants; import org.apache.tephra.util.TxUtils; @@ -34,6 +36,7 @@ import org.apache.tephra.util.TxUtils; import java.io.IOException; import java.util.List; import java.util.Map; + import javax.annotation.Nullable; /** @@ -291,6 +294,7 @@ public class TransactionVisibilityFilter extends FilterBase { private static final class DeleteTracker { private long familyDeleteTs; + private byte[] rowKey; public static boolean isFamilyDelete(Cell cell) { return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && @@ -300,14 +304,17 @@ public class TransactionVisibilityFilter extends FilterBase { public void addFamilyDelete(Cell delete) { this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); } public boolean isDeleted(Cell cell) { - return cell.getTimestamp() <= familyDeleteTs; + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; } public void reset() { this.familyDeleteTs = 0; + this.rowKey = null; } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index e2fadbd..30fcd8a 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -19,14 +19,19 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -42,23 +47,29 @@ import org.apache.hadoop.hbase.filter.LongComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.tephra.TxConstants.ConflictDetection; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +102,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; - + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniDFSCluster dfsCluster; + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); @@ -135,18 +151,33 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + //override AbstractHBaseTableTest.startMiniCluster to setup configuration @BeforeClass - public static void setupBeforeClass() throws Exception { - txStateStorage = new InMemoryTransactionStateStorage(); - txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); - txManager.startAndWait(); - } + public static void startMiniCluster() throws Exception { + // setup the configuration to use HDFSTransactionStateStorage + conf = HBaseConfiguration.create(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5); + + AbstractHBaseTableTest.startMiniCluster(); + + txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); + txManager.startAndWait(); + } @AfterClass public static void shutdownAfterClass() throws Exception { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before @@ -242,7 +273,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { result = txTable.get(new Get(TestBytes.row)); txContext.finish(); assertTrue(result.isEmpty()); - + // test column delete // load 10 rows txContext.start(); @@ -394,6 +425,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + @Test + public void testFamilyDeleteWithCompaction() throws Exception { + HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"), + new byte[][]{TestBytes.family, TestBytes.family2}); + try { + TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + + txContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + + put = new Put(TestBytes.row2); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + txContext.finish(); + + txContext.start(); + Result result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertFalse(result.isEmpty()); + + txContext.start(); + // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete) + Delete delete = new Delete(TestBytes.row); + delete.deleteFamily(TestBytes.family); + txTable.delete(delete); + txContext.finish(); + + txContext.start(); + result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertTrue(result.isEmpty()); + + boolean compactionDone = false; + int count = 0; + while (count++ < 12 && !compactionDone) { + // run major compaction and verify the row was removed + HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin(); + hbaseAdmin.flush("TestFamilyDeleteWithCompaction"); + hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction"); + hbaseAdmin.close(); + Thread.sleep(5000L); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row); + scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + compactionDone = scanner.next() == null; + scanner.close(); + } + assertTrue("Compaction should have removed the row", compactionDone); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row2); + scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + } finally { + hTable.close(); + } + } + /** * Test aborted transactional delete requests, that must be rolled back. * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index a258972..5ad7c29 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TxConstants; import org.apache.tephra.util.TxUtils; @@ -286,6 +287,7 @@ public class TransactionVisibilityFilter extends FilterBase { private static final class DeleteTracker { private long familyDeleteTs; + private byte[] rowKey; public static boolean isFamilyDelete(Cell cell) { return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && @@ -295,14 +297,17 @@ public class TransactionVisibilityFilter extends FilterBase { public void addFamilyDelete(Cell delete) { this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); } public boolean isDeleted(Cell cell) { - return cell.getTimestamp() <= familyDeleteTs; + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; } public void reset() { this.familyDeleteTs = 0; + this.rowKey = null; } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/6edfa091/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 46ac384..11ffd1a 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -19,14 +19,17 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.OperationWithAttributes; @@ -42,23 +45,29 @@ import org.apache.hadoop.hbase.filter.LongComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.tephra.TxConstants.ConflictDetection; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; import org.apache.tephra.persist.InMemoryTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +100,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + + private static MiniDFSCluster dfsCluster; + + public static void tearDownAfterClass() throws Exception { + dfsCluster.shutdown(); + } private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); @@ -137,7 +155,32 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { @BeforeClass public static void setupBeforeClass() throws Exception { - txStateStorage = new InMemoryTransactionStateStorage(); + testUtil = new HBaseTestingUtility(); + conf = testUtil.getConfiguration(); + + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.newFolder().getAbsolutePath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath()); + + conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5); + + // Tune down the connection thread pool size + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + // Tunn down handler threads in regionserver + conf.setInt("hbase.regionserver.handler.count", 10); + + // Set to random port + conf.setInt("hbase.master.port", 0); + conf.setInt("hbase.master.info.port", 0); + conf.setInt("hbase.regionserver.port", 0); + conf.setInt("hbase.regionserver.info.port", 0); + + testUtil.startMiniCluster(); + hBaseAdmin = testUtil.getHBaseAdmin(); + txStateStorage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); txManager.startAndWait(); } @@ -241,7 +284,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { result = txTable.get(new Get(TestBytes.row)); txContext.finish(); assertTrue(result.isEmpty()); - + // test column delete // load 10 rows txContext.start(); @@ -391,6 +434,75 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { } } + @Test + public void testFamilyDeleteWithCompaction() throws Exception { + HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"), + new byte[][]{TestBytes.family, TestBytes.family2}); + try { + TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW); + TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); + + txContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + + put = new Put(TestBytes.row2); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + txTable.put(put); + txContext.finish(); + + txContext.start(); + Result result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertFalse(result.isEmpty()); + + txContext.start(); + // test family delete with ConflictDetection.ROW (as ConflictDetection.COLUMN converts this to a column delete) + Delete delete = new Delete(TestBytes.row); + delete.deleteFamily(TestBytes.family); + txTable.delete(delete); + txContext.finish(); + + txContext.start(); + result = txTable.get(new Get(TestBytes.row)); + txContext.finish(); + assertTrue(result.isEmpty()); + + boolean compactionDone = false; + int count = 0; + while (count++ < 12 && !compactionDone) { + // run major compaction and verify the row was removed + HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin(); + hbaseAdmin.flush("TestFamilyDeleteWithCompaction"); + hbaseAdmin.majorCompact("TestFamilyDeleteWithCompaction"); + hbaseAdmin.close(); + Thread.sleep(5000L); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row); + scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + compactionDone = scanner.next() == null; + scanner.close(); + } + assertTrue("Compaction should have removed the row", compactionDone); + + Scan scan = new Scan(); + scan.setStartRow(TestBytes.row2); + scan.setStopRow(Bytes.add(TestBytes.row2, new byte[] { 0 })); + scan.setRaw(true); + + ResultScanner scanner = hTable.getScanner(scan); + Result res = scanner.next(); + assertNotNull(res); + } finally { + hTable.close(); + } + } + /** * Test aborted transactional delete requests, that must be rolled back. *
