Hi Thomas,

I have attached a diff where the test is passing. The only changes that I
added (in addition to decreasing the snapshot interval) were to invoke
compact/flush again (so that the new snapshot is being picked). I also
increased the sleep interval. After these changes, the test is passing.

Thanks,
Gokul

On Thu, Jan 5, 2017 at 12:25 PM, Thomas D'Silva <[email protected]>
wrote:

> Thanks Gokul!
>
> On Thu, Jan 5, 2017 at 12:17 PM, Gokul Gunasekaran <[email protected]>
> wrote:
>
> > Hi Thomas,
> >
> > Your setup looks right. The only thing it was missing was decreasing the
> > snapshot frequency of the Tx Manager which is by default set to
> 300seconds.
> >
> > I added this line after setting the tx snapshot dir:
> >
> >     conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 2L);
> >
> > And the test passes now. The coprocessors use the snapshot data to clean
> up
> > data.
> >
> > Thanks,
> > Gokul
> >
> > On Wed, Jan 4, 2017 at 4:24 PM, Thomas D'Silva <[email protected]>
> > wrote:
> >
> > > Hi,
> > >
> > > I am trying to debug a Phoenix test failure that deletes rows from a
> > > transactional table and then runs major compaction to remove the
> deleted
> > > rows. The rows are not getting removed after major compaction is run. I
> > > think it might be related to my test setup. I have attached a patch
> that
> > > modifies TransactionAwareHTableTest to demonstrate my setup.
> > >
> > > Can anyone let me know if there is anything wrong with my test setup?
> > >
> > > Thank you,
> > > Thomas
> > >
> >
>
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 0b90d7f..274ebad 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -457,6 +457,7 @@ public class TransactionManager extends AbstractService {
         persistor.writeSnapshot(snapshot);
         lastSnapshotTime = snapshotTime;
 
+        LOG.info("Writing snapshot : " + snapshot.getTimestamp());
         // clean any obsoleted snapshots and WALs
         long oldestRetainedTimestamp = 
persistor.deleteOldSnapshots(snapshotRetainCount);
         persistor.deleteLogsOlderThan(oldestRetainedTimestamp);
diff --git 
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 0f60910..316f4ff 100644
--- 
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -259,7 +259,9 @@ public class TransactionProcessor extends 
BaseRegionObserver {
   public InternalScanner 
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
                                              KeyValueScanner memstoreScanner, 
InternalScanner scanner)
       throws IOException {
-    return createStoreScanner(c.getEnvironment(), "flush", 
cache.getLatestState(), store,
+    TransactionVisibilityState state = cache.getLatestState();
+    LOG.info("VisibilityState timestamp " + state.getTimestamp());
+    return createStoreScanner(c.getEnvironment(), "flush", state, store,
                               Collections.singletonList(memstoreScanner), 
ScanType.COMPACT_RETAIN_DELETES,
                               HConstants.OLDEST_TIMESTAMP);
   }
@@ -269,7 +271,9 @@ public class TransactionProcessor extends 
BaseRegionObserver {
       List<? extends KeyValueScanner> scanners, ScanType scanType, long 
earliestPutTs, InternalScanner s,
       CompactionRequest request)
       throws IOException {
-    return createStoreScanner(c.getEnvironment(), "compaction", 
cache.getLatestState(), store, scanners,
+    TransactionVisibilityState state = cache.getLatestState();
+    LOG.info("VisibilityState timestamp " + state.getTimestamp());
+    return createStoreScanner(c.getEnvironment(), "compaction", state, store, 
scanners,
                               scanType, earliestPutTs);
   }
 
diff --git 
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
 
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 02a8b7d..5e0e125 100644
--- 
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ 
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.filter.LongComparator;
 import org.apache.hadoop.hbase.filter.ValueFilter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
@@ -58,14 +59,17 @@ import org.apache.tephra.TxConstants;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,6 +104,15 @@ public class TransactionAwareHTableTest {
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
   private HTable hTable;
+  
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+  
+  private static MiniDFSCluster dfsCluster;
+  
+  public static void tearDownAfterClass() throws Exception {
+    dfsCluster.shutdown();
+  }
 
   private static final class TestBytes {
     private static final byte[] table = Bytes.toBytes("testtable");
@@ -148,6 +161,14 @@ public class TransactionAwareHTableTest {
   public static void setupBeforeClass() throws Exception {
     testUtil = new HBaseTestingUtility();
     conf = testUtil.getConfiguration();
+    
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tmpFolder.newFolder().getAbsolutePath());
+    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    
+    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, 
tmpFolder.newFolder().getAbsolutePath());
+
+    conf.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 2L);
 
     // Tune down the connection thread pool size
     conf.setInt("hbase.hconnection.threads.core", 5);
@@ -163,7 +184,7 @@ public class TransactionAwareHTableTest {
 
     testUtil.startMiniCluster();
     hBaseAdmin = testUtil.getHBaseAdmin();
-    txStateStorage = new InMemoryTransactionStateStorage();
+    txStateStorage = new HDFSTransactionStateStorage(conf, new 
SnapshotCodecProvider(conf), new TxMetricsCollector());
     txManager = new TransactionManager(conf, txStateStorage, new 
TxMetricsCollector());
     txManager.startAndWait();
   }
@@ -295,6 +316,32 @@ public class TransactionAwareHTableTest {
       result = txTable.get(new Get(TestBytes.row));
       txContext.finish();
       assertTrue(result.isEmpty());
+      
+      // run major compaction and verify the row was removed
+      HBaseAdmin hbaseAdmin = testUtil.getHBaseAdmin();
+      hbaseAdmin.majorCompact("TestValidTransactionalDelete");
+      hbaseAdmin.flush("TestValidTransactionalDelete");
+      hbaseAdmin.close();
+      
+      boolean compactionDone = false;
+      int count = 0;
+      while (count++ < 5 && !compactionDone) {
+         Thread.sleep(10000L);
+         Scan scan = new Scan();
+         scan.setStartRow(TestBytes.row);
+         scan.setStopRow(Bytes.add(TestBytes.row, new byte[] { 0 }));
+         scan.setRaw(true);
+
+        ResultScanner scanner = hTable.getScanner(scan);
+         compactionDone = scanner.next() == null;
+         scanner.close();
+
+        hbaseAdmin = testUtil.getHBaseAdmin();
+        hbaseAdmin.majorCompact("TestValidTransactionalDelete");
+        hbaseAdmin.flush("TestValidTransactionalDelete");
+        hbaseAdmin.close();
+      }
+      assertTrue("Compaction should have removed the row", compactionDone);
 
       // test column delete
       // load 10 rows

Reply via email to