Hi,
I am trying to debug a Phoenix test failure that deletes rows from a
transactional table and then runs major compaction to remove the deleted
rows. The rows are not getting removed after major compaction is run. I
think it might be related to my test setup. I have attached a patch that
modifies TransactionAwareHTableTest to demonstrate my setup.
Can anyone let me know if there is anything wrong with my test setup?
Thank you,
Thomas
diff --git
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 02a8b7d..961df7c 100644
---
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -19,6 +19,7 @@ package org.apache.tephra.hbase;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.filter.LongComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionContext;
@@ -58,14 +60,18 @@ import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.InMemoryTransactionStateStorage;
import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,6 +106,15 @@ public class TransactionAwareHTableTest {
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
private HTable hTable;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster dfsCluster;
+
+ public static void tearDownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
@@ -148,6 +163,12 @@ public class TransactionAwareHTableTest {
public static void setupBeforeClass() throws Exception {
testUtil = new HBaseTestingUtility();
conf = testUtil.getConfiguration();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
tmpFolder.newFolder().getAbsolutePath());
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR,
tmpFolder.newFolder().getAbsolutePath());
// Tune down the connection thread pool size
conf.setInt("hbase.hconnection.threads.core", 5);
@@ -163,7 +184,7 @@ public class TransactionAwareHTableTest {
testUtil.startMiniCluster();
hBaseAdmin = testUtil.getHBaseAdmin();
- txStateStorage = new InMemoryTransactionStateStorage();
+ txStateStorage = new HDFSTransactionStateStorage(conf, new
SnapshotCodecProvider(conf), new TxMetricsCollector());
txManager = new TransactionManager(conf, txStateStorage, new
TxMetricsCollector());
txManager.startAndWait();
}
@@ -295,6 +316,27 @@ public class TransactionAwareHTableTest {
result = txTable.get(new Get(TestBytes.row));
txContext.finish();
assertTrue(result.isEmpty());
+
+ // run major compaction and verify the row was removed
+ HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+ hbaseAdmin.flush("TestValidTransactionalDelete");
+ hbaseAdmin.majorCompact("TestValidTransactionalDelete");
+ hbaseAdmin.close();
+
+ boolean compactionDone = false;
+ int count = 0;
+ while (count++ < 5 && !compactionDone) {
+ Thread.sleep(2000L);
+ Scan scan = new Scan();
+ scan.setStartRow(TestBytes.row);
+ scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+ scan.setRaw(true);
+
+ ResultScanner scanner = hTable.getScanner(scan);
+ compactionDone = scanner.next() == null;
+ scanner.close();
+ }
+ assertTrue("Compaction should have removed the row", compactionDone);
// test column delete
// load 10 rows