Hi Thomas,
I have attached a diff where the test is passing. The only changes that I
added (in addition to decreasing the snapshot interval) were to invoke
compact/flush again (so that the new snapshot is being picked). I also
increased the sleep interval. After these changes, the test is passing.
Thanks,
Gokul
On Thu, Jan 5, 2017 at 12:25 PM, Thomas D'Silva <[email protected]>
wrote:
> Thanks Gokul!
>
> On Thu, Jan 5, 2017 at 12:17 PM, Gokul Gunasekaran <[email protected]>
> wrote:
>
> > Hi Thomas,
> >
> > Your setup looks right. The only thing it was missing was decreasing the
> > snapshot frequency of the Tx Manager which is by default set to
> 300seconds.
> >
> > I added this line after setting the tx snapshot dir:
> >
> > conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 2L);
> >
> > And the test passes now. The coprocessors use the snapshot data to clean
> up
> > data.
> >
> > Thanks,
> > Gokul
> >
> > On Wed, Jan 4, 2017 at 4:24 PM, Thomas D'Silva <[email protected]>
> > wrote:
> >
> > > Hi,
> > >
> > > I am trying to debug a Phoenix test failure that deletes rows from a
> > > transactional table and then runs major compaction to remove the
> deleted
> > > rows. The rows are not getting removed after major compaction is run. I
> > > think it might be related to my test setup. I have attached a patch
> that
> > > modifies TransactionAwareHTableTest to demonstrate my setup.
> > >
> > > Can anyone let me know if there is anything wrong with my test setup?
> > >
> > > Thank you,
> > > Thomas
> > >
> >
>
diff --git
a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 0b90d7f..274ebad 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -457,6 +457,7 @@ public class TransactionManager extends AbstractService {
persistor.writeSnapshot(snapshot);
lastSnapshotTime = snapshotTime;
+ LOG.info("Writing snapshot : " + snapshot.getTimestamp());
// clean any obsoleted snapshots and WALs
long oldestRetainedTimestamp =
persistor.deleteOldSnapshots(snapshotRetainCount);
persistor.deleteLogsOlderThan(oldestRetainedTimestamp);
diff --git
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 0f60910..316f4ff 100644
---
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -259,7 +259,9 @@ public class TransactionProcessor extends
BaseRegionObserver {
public InternalScanner
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
KeyValueScanner memstoreScanner,
InternalScanner scanner)
throws IOException {
- return createStoreScanner(c.getEnvironment(), "flush",
cache.getLatestState(), store,
+ TransactionVisibilityState state = cache.getLatestState();
+ LOG.info("VisibilityState timestamp " + state.getTimestamp());
+ return createStoreScanner(c.getEnvironment(), "flush", state, store,
Collections.singletonList(memstoreScanner),
ScanType.COMPACT_RETAIN_DELETES,
HConstants.OLDEST_TIMESTAMP);
}
@@ -269,7 +271,9 @@ public class TransactionProcessor extends
BaseRegionObserver {
List<? extends KeyValueScanner> scanners, ScanType scanType, long
earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
- return createStoreScanner(c.getEnvironment(), "compaction",
cache.getLatestState(), store, scanners,
+ TransactionVisibilityState state = cache.getLatestState();
+ LOG.info("VisibilityState timestamp " + state.getTimestamp());
+ return createStoreScanner(c.getEnvironment(), "compaction", state, store,
scanners,
scanType, earliestPutTs);
}
diff --git
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 02a8b7d..5e0e125 100644
---
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.filter.LongComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionContext;
@@ -58,14 +59,17 @@ import org.apache.tephra.TxConstants;
import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
import org.apache.tephra.inmemory.InMemoryTxSystemClient;
import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,6 +104,15 @@ public class TransactionAwareHTableTest {
private TransactionContext transactionContext;
private TransactionAwareHTable transactionAwareHTable;
private HTable hTable;
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private static MiniDFSCluster dfsCluster;
+
+ public static void tearDownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
@@ -148,6 +161,14 @@ public class TransactionAwareHTableTest {
public static void setupBeforeClass() throws Exception {
testUtil = new HBaseTestingUtility();
conf = testUtil.getConfiguration();
+
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
tmpFolder.newFolder().getAbsolutePath());
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR,
tmpFolder.newFolder().getAbsolutePath());
+
+ conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 2L);
// Tune down the connection thread pool size
conf.setInt("hbase.hconnection.threads.core", 5);
@@ -163,7 +184,7 @@ public class TransactionAwareHTableTest {
testUtil.startMiniCluster();
hBaseAdmin = testUtil.getHBaseAdmin();
- txStateStorage = new InMemoryTransactionStateStorage();
+ txStateStorage = new HDFSTransactionStateStorage(conf, new
SnapshotCodecProvider(conf), new TxMetricsCollector());
txManager = new TransactionManager(conf, txStateStorage, new
TxMetricsCollector());
txManager.startAndWait();
}
@@ -295,6 +316,32 @@ public class TransactionAwareHTableTest {
result = txTable.get(new Get(TestBytes.row));
txContext.finish();
assertTrue(result.isEmpty());
+
+ // run major compaction and verify the row was removed
+ HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+ hbaseAdmin.majorCompact("TestValidTransactionalDelete");
+ hbaseAdmin.flush("TestValidTransactionalDelete");
+ hbaseAdmin.close();
+
+ boolean compactionDone = false;
+ int count = 0;
+ while (count++ < 5 && !compactionDone) {
+ Thread.sleep(10000L);
+ Scan scan = new Scan();
+ scan.setStartRow(TestBytes.row);
+ scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+ scan.setRaw(true);
+
+ ResultScanner scanner = hTable.getScanner(scan);
+ compactionDone = scanner.next() == null;
+ scanner.close();
+
+ hbaseAdmin = testUtil.getHBaseAdmin();
+ hbaseAdmin.majorCompact("TestValidTransactionalDelete");
+ hbaseAdmin.flush("TestValidTransactionalDelete");
+ hbaseAdmin.close();
+ }
+ assertTrue("Compaction should have removed the row", compactionDone);
// test column delete
// load 10 rows