Repository: incubator-tephra
Updated Branches:
  refs/heads/master fd6ef73d3 -> 8f958edb6


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 
b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
new file mode 100644
index 0000000..ac5e923
--- /dev/null
+++ 
b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.hbase.AbstractHBaseTableTest;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.txprune.TransactionPruningPlugin;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Test invalid list pruning
+ */
+public class InvalidListPruneTest extends AbstractHBaseTableTest {
+  private static final byte[] family = Bytes.toBytes("f1");
+  private static final byte[] qualifier = Bytes.toBytes("col1");
+  private static final int MAX_ROWS = 1000;
+
+  private static TableName txDataTable1;
+  private static TableName pruneStateTable;
+  private static DataJanitorState dataJanitorState;
+
+  // Override AbstractHBaseTableTest.startMiniCluster to setup configuration
+  @BeforeClass
+  public static void startMiniCluster() throws Exception {
+    // Setup the configuration to start HBase cluster with the invalid list 
pruning enabled
+    conf = HBaseConfiguration.create();
+    conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to 
wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
+    AbstractHBaseTableTest.startMiniCluster();
+
+    TransactionStateStorage txStateStorage = new 
InMemoryTransactionStateStorage();
+    TransactionManager txManager = new TransactionManager(conf, 
txStateStorage, new TxMetricsCollector());
+    txManager.startAndWait();
+
+    // Do some transactional data operations
+    txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
+    HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, 
false,
+                                
Collections.singletonList(TestTransactionProcessor.class.getName()));
+    try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, 
TxConstants.ConflictDetection.ROW)) {
+      TransactionContext txContext = new TransactionContext(new 
InMemoryTxSystemClient(txManager), txTable);
+      txContext.start();
+      for (int i = 0; i < MAX_ROWS; ++i) {
+        txTable.put(new Put(Bytes.toBytes(i)).addColumn(family, qualifier, 
Bytes.toBytes(i)));
+      }
+      txContext.finish();
+    }
+
+    testUtil.flush(txDataTable1);
+    txManager.stopAndWait();
+
+    pruneStateTable = 
TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                                 
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
+    dataJanitorState =
+      new DataJanitorState(new DataJanitorState.TableSupplier() {
+        @Override
+        public Table get() throws IOException {
+          return testUtil.getConnection().getTable(pruneStateTable);
+        }
+      });
+
+  }
+
+  @AfterClass
+  public static void shutdownAfterClass() throws Exception {
+    hBaseAdmin.disableTable(txDataTable1);
+    hBaseAdmin.deleteTable(txDataTable1);
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    createPruneStateTable();
+    InMemoryTransactionStateCache.setTransactionSnapshot(null);
+  }
+
+  private void createPruneStateTable() throws Exception {
+    HTable table = createTable(pruneStateTable.getName(), new 
byte[][]{DataJanitorState.FAMILY}, false,
+                               // Prune state table is a non-transactional 
table, hence no transaction co-processor
+                               Collections.<String>emptyList());
+    table.close();
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    // Disable the data table so that prune writer thread gets stopped,
+    // this makes sure that any cached value will not interfere with next test
+    hBaseAdmin.disableTable(txDataTable1);
+    deletePruneStateTable();
+    // Enabling the table enables the prune writer thread again
+    hBaseAdmin.enableTable(txDataTable1);
+  }
+
+  private void deletePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      hBaseAdmin.disableTable(pruneStateTable);
+      hBaseAdmin.deleteTable(pruneStateTable);
+    }
+  }
+
+  @Test
+  public void testRecordCompactionState() throws Exception {
+    // No prune upper bound initially
+    Assert.assertEquals(-1,
+                        
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
+    // Run minor compaction
+    testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // No prune upper bound after minor compaction too
+    Assert.assertEquals(-1,
+                        
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+
+    // Run major compaction, and verify prune upper bound
+    testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    Assert.assertEquals(50,
+                        
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+
+    // Run major compaction again with same snapshot, prune upper bound should 
not change
+    testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    Assert.assertEquals(50,
+                        
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(110, 111, 112, ImmutableSet.of(150L),
+                              ImmutableSortedMap.of(105L, new 
TransactionManager.InProgressTx(
+                                100, 30, 
TransactionManager.InProgressType.SHORT))));
+    Assert.assertEquals(50,
+                        
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+
+    // Run major compaction again, now prune upper bound should change
+    testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    Assert.assertEquals(104,
+                        
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+  }
+
+  @Test
+  public void testRecordCompactionStateNoTable() throws Exception {
+    // Create a new transaction snapshot
+    InMemoryTransactionStateCache.setTransactionSnapshot(
+      new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L),
+                              ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
+    // Run major compaction, and verify it completes
+    long now = System.currentTimeMillis();
+    testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    long lastMajorCompactionTime = 
TestTransactionProcessor.lastMajorCompactionTime.get();
+    Assert.assertTrue(String.format("Expected %d, but was %d", now, 
lastMajorCompactionTime),
+                      lastMajorCompactionTime >= now);
+  }
+
+  @Test
+  public void testRecordCompactionStateNoTxSnapshot() throws Exception {
+    // Test recording state without having a transaction snapshot to make sure 
we don't disrupt
+    // major compaction in that case
+    InMemoryTransactionStateCache.setTransactionSnapshot(null);
+    // Run major compaction, and verify it completes
+    long now = System.currentTimeMillis();
+    testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    long lastMajorCompactionTime = 
TestTransactionProcessor.lastMajorCompactionTime.get();
+    Assert.assertTrue(String.format("Expected %d, but was %d", now, 
lastMajorCompactionTime),
+                      lastMajorCompactionTime >= now);
+  }
+
+  @Test
+  public void testPruneUpperBound() throws Exception {
+    TransactionPruningPlugin transactionPruningPlugin = new 
TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      // Run without a transaction snapshot first
+      long now1 = 200;
+      long inactiveTxTimeNow1 = 150 * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound1 = -1;
+      // fetch prune upper bound
+      long pruneUpperBound1 = 
transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+
+      TimeRegions expectedRegions1 =
+        new TimeRegions(now1,
+                        ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+                          .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+                          .build());
+      // Assert prune state is recorded correctly
+      Assert.assertEquals(expectedRegions1, 
dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1,
+                          
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow1, 
dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      // Run prune complete
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Assert prune state was cleaned up correctly based on the prune time
+      Assert.assertEquals(expectedRegions1, 
dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1,
+                          
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow1, 
dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      // Create a new transaction snapshot, and run major compaction on 
txDataTable1
+      // And run all assertions again
+      long now2 = 300;
+      long inactiveTxTimeNow2 = 250 * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = 200 * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, 
expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
+      TimeRegions expectedRegions2 =
+        new TimeRegions(now2,
+                        ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR)
+                          .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
+                          .build());
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      long pruneUpperBound2 = 
transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+
+      Assert.assertEquals(expectedRegions2, 
dataJanitorState.getRegionsOnOrBeforeTime(now2));
+      Assert.assertEquals(expectedPruneUpperBound2,
+                          
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow2, 
dataJanitorState.getInactiveTransactionBoundForTime(now2));
+      Assert.assertEquals(expectedRegions1, 
dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(inactiveTxTimeNow1, 
dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+      transactionPruningPlugin.pruneComplete(now2, pruneUpperBound2);
+      Assert.assertEquals(expectedRegions2, 
dataJanitorState.getRegionsOnOrBeforeTime(now2));
+      Assert.assertEquals(expectedPruneUpperBound2,
+                          
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
+      Assert.assertEquals(inactiveTxTimeNow2, 
dataJanitorState.getInactiveTransactionBoundForTime(now2));
+      Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(now1));
+      Assert.assertEquals(expectedPruneUpperBound1, 
dataJanitorState.getInactiveTransactionBoundForTime(now1));
+
+    } finally {
+      transactionPruningPlugin.destroy();
+    }
+  }
+
+  @Test
+  public void testPruneEmptyTable() throws Exception {
+    // Make sure that empty tables do not block the progress of pruning
+
+    // Create an empty table
+    TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
+    HTable emptyHTable = createTable(txEmptyTable.getName(), new 
byte[][]{family}, false,
+                                     
Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+    TransactionPruningPlugin transactionPruningPlugin = new 
TestTransactionPruningPlugin();
+    transactionPruningPlugin.initialize(conf);
+
+    try {
+      long now1 = System.currentTimeMillis();
+      long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+      long noPruneUpperBound = -1;
+      long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound1, 
expectedPruneUpperBound1, expectedPruneUpperBound1,
+                                ImmutableSet.of(expectedPruneUpperBound1),
+                                ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
+      testUtil.compact(txEmptyTable, true);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, there should be no prune upper bound since 
txEmptyTable cannot be compacted
+      long pruneUpperBound1 = 
transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+      Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+      // Now flush the empty table, this will record the table region as 
empty, and then pruning will continue
+      hBaseAdmin.flush(txEmptyTable);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // fetch prune upper bound, again, this time it should work
+      pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, 
inactiveTxTimeNow1);
+      Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
+      transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
+
+      // Now add some data to the empty table
+      // (adding data non-transactionally is okay too, we just need some data 
for the compaction to run)
+      emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, 
Bytes.toBytes(1)));
+      emptyHTable.close();
+
+      // Now run another compaction on txDataTable1 with an updated tx snapshot
+      long now2 = System.currentTimeMillis();
+      long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
+      long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
+      InMemoryTransactionStateCache.setTransactionSnapshot(
+        new TransactionSnapshot(expectedPruneUpperBound2, 
expectedPruneUpperBound2, expectedPruneUpperBound2,
+                                ImmutableSet.of(expectedPruneUpperBound2),
+                                ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+
+      // Running a prune now should still return min(inactiveTxTimeNow1, 
expectedPruneUpperBound1) since
+      // txEmptyTable is no longer empty. This information is returned since 
the txEmptyTable was recorded as being
+      // empty in the previous run with inactiveTxTimeNow1
+      long pruneUpperBound2 = 
transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+      Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
+
+      // However, after compacting txEmptyTable we should get the latest upper 
bound
+      testUtil.flush(txEmptyTable);
+      testUtil.compact(txEmptyTable, true);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
+      pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, 
inactiveTxTimeNow2);
+      Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
+      transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
+    } finally {
+      transactionPruningPlugin.destroy();
+      hBaseAdmin.disableTable(txEmptyTable);
+      hBaseAdmin.deleteTable(txEmptyTable);
+    }
+    }
+
+  private byte[] getRegionName(TableName dataTable, byte[] row) throws 
IOException {
+    HRegionLocation regionLocation =
+      
testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
+    return regionLocation.getRegionInfo().getRegionName();
+  }
+
+  /**
+   * A transaction co-processor that uses in-memory {@link 
TransactionSnapshot} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionProcessor extends TransactionProcessor {
+    private static final AtomicLong lastMajorCompactionTime = new 
AtomicLong(-1);
+
+    @Override
+    protected CacheSupplier<TransactionStateCache> 
getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+      return new CacheSupplier<TransactionStateCache>() {
+        @Override
+        public TransactionStateCache get() {
+          return new InMemoryTransactionStateCache();
+        }
+
+        @Override
+        public void release() {
+          // no-op
+        }
+      };
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, 
Store store, StoreFile resultFile,
+                            CompactionRequest request) throws IOException {
+      super.postCompact(e, store, resultFile, request);
+      lastMajorCompactionTime.set(System.currentTimeMillis());
+    }
+  }
+
+  /**
+   * Used to supply in-memory {@link TransactionSnapshot} to {@link 
TestTransactionProcessor} for testing
+   */
+  @SuppressWarnings("WeakerAccess")
+  public static class InMemoryTransactionStateCache extends 
TransactionStateCache {
+    private static TransactionVisibilityState transactionSnapshot;
+
+    public static void setTransactionSnapshot(TransactionVisibilityState 
transactionSnapshot) {
+      InMemoryTransactionStateCache.transactionSnapshot = transactionSnapshot;
+    }
+
+    @Override
+    protected void startUp() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    protected void shutDown() throws Exception {
+      // Nothing to do
+    }
+
+    @Override
+    public TransactionVisibilityState getLatestState() {
+      return transactionSnapshot;
+    }
+  }
+
+  @SuppressWarnings("WeakerAccess")
+  public static class TestTransactionPruningPlugin extends 
HBaseTransactionPruningPlugin {
+    @Override
+    protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
+      return 
tableDescriptor.hasCoprocessor(TestTransactionProcessor.class.getName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
 
b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
new file mode 100644
index 0000000..08d3b49
--- /dev/null
+++ 
b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}.
+ */
+public class PruneUpperBoundWriterSupplierTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class);
+  private static final int NUM_OPS = 10000;
+  private static final int NUM_THREADS = 50;
+
+  @Test
+  public void testSupplier() throws Exception {
+    final PruneUpperBoundWriterSupplier supplier = new 
PruneUpperBoundWriterSupplier(null, null, 10L);
+    // Get one instance now, for later comparisons
+    final PruneUpperBoundWriter writer = supplier.get();
+    final AtomicInteger numOps = new AtomicInteger(NUM_OPS);
+    final Random random = new Random(System.currentTimeMillis());
+
+    // Start threads that will 'get' PruneUpperBoundWriters
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future> futureList = new ArrayList<>();
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // Perform NUM_OPS 'gets' of PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            PruneUpperBoundWriter newWriter = supplier.get();
+            Assert.assertTrue(newWriter == writer);
+            int waitTime = random.nextInt(10);
+            try {
+              TimeUnit.MICROSECONDS.sleep(waitTime);
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(5, TimeUnit.SECONDS);
+    }
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    futureList.clear();
+    numOps.set(NUM_OPS);
+    // Start thread that release PruneUpperBoundWriters
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+    for (int i = 0; i < NUM_THREADS; i++) {
+      futureList.add(executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          // We need to release all NUM_OPS 'gets' that were executed to 
trigger shutdown of the single instance of
+          // PruneUpperBoundWriter
+          while (numOps.decrementAndGet() > 0) {
+            supplier.release();
+            try {
+              TimeUnit.MICROSECONDS.sleep(random.nextInt(10));
+            } catch (InterruptedException e) {
+              LOG.warn("Received an exception.", e);
+            }
+          }
+        }
+      }));
+    }
+
+    for (Future future : futureList) {
+      future.get(1, TimeUnit.SECONDS);
+    }
+
+    executor.shutdown();
+    executor.awaitTermination(2, TimeUnit.SECONDS);
+
+    // Verify that the PruneUpperBoundWriter is still running and the 
pruneThread is still alive.
+    Assert.assertTrue(writer.isRunning());
+    Assert.assertTrue(writer.isAlive());
+
+    // Since we got one instance in the beginning, we need to release it
+    supplier.release();
+
+    // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread 
is not alive anymore.
+    Assert.assertFalse(writer.isRunning());
+    Assert.assertFalse(writer.isAlive());
+  }
+}

Reply via email to