Repository: incubator-tephra Updated Branches: refs/heads/master 13201db73 -> 2af5ac2bd
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index a63cf75..8142601 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -46,6 +46,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -201,6 +202,8 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime); LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime); dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime); + LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime); + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime); } @Override @@ -289,26 +292,40 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); long time = timeRegions.getTime(); - Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); + long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); + LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound); + // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions + if (inactiveTransactionBound == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + + "and hence the data must be incomplete", time); + } + continue; + } + + // Get the prune upper bounds for all the transactional regions + Map<byte[], Long> pruneUpperBoundRegions = + dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); logPruneUpperBoundRegions(pruneUpperBoundRegions); + + // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are + // recorded as empty after inactiveTransactionBoundTime will not have invalid data + // for transactions started on or before inactiveTransactionBoundTime + pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions, + pruneUpperBoundRegions); + // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound // across all regions - if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) { - long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); - LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time); - // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions - if (inactiveTransactionBound != -1) { - Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); - return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + - "and hence the data must be incomplete", time); - } - } + if (!transactionalRegions.isEmpty() && + pruneUpperBoundRegions.size() == transactionalRegions.size()) { + Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); + long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); + LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time); + return pruneUpperBound; } else { if (LOG.isDebugEnabled()) { - Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); + Sets.SetView<byte[]> difference = + Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}", time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN)); } @@ -319,6 +336,28 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, + SortedSet<byte[]> transactionalRegions, + Map<byte[], Long> pruneUpperBoundRegions) throws IOException { + long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound); + SortedSet<byte[]> emptyRegions = + dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions); + LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}", + inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + + // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data + // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound + // for these empty regions as inactiveTransactionBound + Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR); + pubWithEmptyRegions.putAll(pruneUpperBoundRegions); + for (byte[] emptyRegion : emptyRegions) { + if (!pruneUpperBoundRegions.containsKey(emptyRegion)) { + pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound); + } + } + return Collections.unmodifiableMap(pubWithEmptyRegions); + } + private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) { if (LOG.isDebugEnabled()) { LOG.debug("Got region - prune upper bound map: {}", http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 5a86b4a..beed1ad 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -39,25 +39,41 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final TableName tableName; private final DataJanitorState dataJanitorState; private final long pruneFlushInterval; + // Map of region name -> prune upper bound private final ConcurrentSkipListMap<byte[], Long> pruneEntries; + // Map of region name -> time the region was found to be empty + private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; private long lastChecked; + @SuppressWarnings("WeakerAccess") public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) { this.tableName = tableName; this.dataJanitorState = dataJanitorState; this.pruneFlushInterval = pruneFlushInterval; this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); } + @SuppressWarnings("WeakerAccess") public void persistPruneEntry(byte[] regionName, long pruneUpperBound) { + warnIfNotRunning(regionName); // The number of entries in this map is bound by the number of regions in this region server and thus it will not // grow indefinitely pruneEntries.put(regionName, pruneUpperBound); } + @SuppressWarnings("WeakerAccess") + public void persistRegionEmpty(byte[] regionName, long time) { + warnIfNotRunning(regionName); + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + emptyRegions.put(regionName, time); + } + + @SuppressWarnings("WeakerAccess") public boolean isAlive() { return flushThread != null && flushThread.isAlive(); } @@ -86,13 +102,22 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - while (pruneEntries.firstEntry() != null) { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); // We can now remove the entry only if the key and value match with what we wrote since it is // possible that a new pruneUpperBound for the same key has been added pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } } catch (IOException ex) { LOG.warn("Cannot record prune upper bound for a region to table " + tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex); @@ -115,4 +140,11 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread.setDaemon(true); flushThread.start(); } + + private void warnIfNotRunning(byte[] regionName) { + if (!isRunning() || !isAlive()) { + LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!", + Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running")); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java index 402892f..b96d87d 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java @@ -156,6 +156,7 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest { } // Verify saved regions + Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0)); Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31)); @@ -163,20 +164,39 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest { dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000)); Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10)); + // Now change the count stored for regions saved at time 0 and 30 + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3); + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3); + } + // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20 + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Delete regions saved on or before time 30 dataJanitorState.deleteAllRegionsOnOrBeforeTime(30); // Values on or before time 30 should be deleted Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30)); Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Counts should be deleted for time on or before 30 + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30)); + Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0)); + } // Values after time 30 should still exist Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40)); + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40)); + } } @Test public void testSaveInactiveTransactionBoundTime() throws Exception { int maxTime = 100; - // Nothing sould be present in the beginning + // Nothing should be present in the beginning Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); // Save inactive transaction bounds for various time values @@ -202,4 +222,59 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest { Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30)); Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); } + + @Test + public void testSaveEmptyRegions() throws Exception { + // Nothing should be present in the beginning + Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + + byte[] region1 = Bytes.toBytes("region1"); + byte[] region2 = Bytes.toBytes("region2"); + byte[] region3 = Bytes.toBytes("region3"); + byte[] region4 = Bytes.toBytes("region4"); + SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4); + + // Now record some empty regions + dataJanitorState.saveEmptyRegionForTime(100, region1); + dataJanitorState.saveEmptyRegionForTime(110, region1); + dataJanitorState.saveEmptyRegionForTime(102, region2); + dataJanitorState.saveEmptyRegionForTime(112, region3); + + Assert.assertEquals(toISet(region1, region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + + Assert.assertEquals(toISet(region1, region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(100, allRegions)); + + Assert.assertEquals(toISet(region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3))); + + Assert.assertEquals(toISet(), + dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of())); + + Assert.assertEquals(toISet(region3), + dataJanitorState.getEmptyRegionsAfterTime(110, allRegions)); + + Assert.assertEquals(toISet(), + dataJanitorState.getEmptyRegionsAfterTime(112, allRegions)); + + // Delete empty regions on or before time 110 + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110); + // Now only region3 should remain + Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions)); + + // Delete empty regions on or before time 150 + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150); + // Now nothing should remain + Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + } + + private ImmutableSortedSet<byte[]> toISet(byte[]... args) { + ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR); + for (byte[] arg : args) { + builder.add(arg); + } + return builder.build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index a431ee3..07746d8 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -72,6 +72,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static TableName txDataTable1; private static TableName pruneStateTable; + private static DataJanitorState dataJanitorState; // Override AbstractHBaseTableTest.startMiniCluster to setup configuration @BeforeClass @@ -105,6 +106,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + } @AfterClass @@ -128,7 +137,12 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { @After public void afterTest() throws Exception { + // Disable the data table so that prune writer thread gets stopped, + // this makes sure that any cached value will not interfere with next test + hBaseAdmin.disableTable(txDataTable1); deletePruneStateTable(); + // Enabling the table enables the prune writer thread again + hBaseAdmin.enableTable(txDataTable1); } private void deletePruneStateTable() throws Exception { @@ -138,32 +152,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } - private void truncatePruneStateTable() throws Exception { - if (hBaseAdmin.tableExists(pruneStateTable)) { - if (hBaseAdmin.isTableEnabled(pruneStateTable)) { - hBaseAdmin.disableTable(pruneStateTable); - } - hBaseAdmin.truncateTable(pruneStateTable, true); - } - } - @Test public void testRecordCompactionState() throws Exception { - DataJanitorState dataJanitorState = - new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return testUtil.getConnection().getTable(pruneStateTable); - } - }); - - // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table - TimeUnit.SECONDS.sleep(2); - // Truncate prune state table to clear any data that might have been written by the previous test - // This is required because during the shutdown of the previous test, compaction might have kicked in and the - // coprocessor still had some data to flush and it might be flushed at the beginning of this test. - truncatePruneStateTable(); - // No prune upper bound initially Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); @@ -212,10 +202,6 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { @Test public void testRecordCompactionStateNoTable() throws Exception { - // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table - // and make sure a major compaction succeeds - deletePruneStateTable(); - // Create a new transaction snapshot InMemoryTransactionStateCache.setTransactionSnapshot( new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L), @@ -247,24 +233,9 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { @Test public void testPruneUpperBound() throws Exception { - DataJanitorState dataJanitorState = - new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return testUtil.getConnection().getTable(pruneStateTable); - } - }); - TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); transactionPruningPlugin.initialize(conf); - // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table - TimeUnit.SECONDS.sleep(2); - // Truncate prune state table to clear any data that might have been written by the previous test - // This is required because during the shutdown of the previous test, compaction might have kicked in and the - // coprocessor still had some data to flush and it might be flushed at the beginning of this test. - truncatePruneStateTable(); - try { // Run without a transaction snapshot first long now1 = 200; @@ -334,6 +305,87 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + @Test + public void testPruneEmptyTable() throws Exception { + // Make sure that empty tables do not block the progress of pruning + + // Create an empty table + TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable"); + HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + testUtil.compact(txEmptyTable, true); + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now flush the empty table, this will record the table region as empty, and then pruning will continue + hBaseAdmin.flush(txEmptyTable); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // fetch prune upper bound, again, this time it should work + pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1); + + // Now add some data to the empty table + // (adding data non-transactionally is okay too, we just need some data for the compaction to run) + emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1))); + emptyHTable.close(); + + // Now run another compaction on txDataTable1 with an updated tx snapshot + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS; + long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2, + ImmutableSet.of(expectedPruneUpperBound2), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + testUtil.flush(txEmptyTable); + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since + // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being + // empty in the previous run with inactiveTxTimeNow1 + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + + // However, after compacting txEmptyTable we should get the latest upper bound + testUtil.flush(txEmptyTable); + testUtil.compact(txEmptyTable, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2); + } finally { + transactionPruningPlugin.destroy(); + hBaseAdmin.disableTable(txEmptyTable); + hBaseAdmin.deleteTable(txEmptyTable); + } + } + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 015077b..5e1b4c5 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -310,6 +311,28 @@ public class TransactionProcessor extends BaseRegionObserver { } @Override + public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException { + // Record whether the region is empty after a flush + Region region = e.getEnvironment().getRegion(); + // After a flush, if the memstore size is zero and there are no store files for any stores in the region + // then the region must be empty + long numStoreFiles = numStoreFilesForRegion(e); + long memstoreSize = region.getMemstoreSize(); + LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", + region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles)); + if (memstoreSize == 0 && numStoreFiles == 0) { + if (pruneEnable == null) { + initPruneState(e); + } + + if (Boolean.TRUE.equals(pruneEnable)) { + compactionState.persistRegionEmpty(System.currentTimeMillis()); + } + } + + } + + @Override public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s, CompactionRequest request) @@ -318,25 +341,7 @@ public class TransactionProcessor extends BaseRegionObserver { TransactionVisibilityState snapshot = cache.getLatestState(); if (pruneEnable == null) { - Configuration conf = getConfiguration(c.getEnvironment()); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily - if (conf != null) { - pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - if (Boolean.TRUE.equals(pruneEnable)) { - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - long pruneFlushInterval = TimeUnit.SECONDS.toMillis( - conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, - TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); - if (LOG.isDebugEnabled()) { - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); - } - } - } + initPruneState(c); } if (Boolean.TRUE.equals(pruneEnable)) { @@ -449,6 +454,36 @@ public class TransactionProcessor extends BaseRegionObserver { return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } + private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) { + Configuration conf = getConfiguration(c.getEnvironment()); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (Boolean.TRUE.equals(pruneEnable)) { + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis( + conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + if (LOG.isDebugEnabled()) { + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } + } + } + } + + private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) { + long numStoreFiles = 0; + for (Store store : c.getEnvironment().getRegion().getStores()) { + numStoreFiles += store.getStorefiles().size(); + } + return numStoreFiles; + } + /** * Filter used to include cells visible to in-progress transactions on flush and commit. */ http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index db7880b..9b856d9 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -93,6 +93,17 @@ public class CompactionState { } /** + * Persist that the given region is empty at the given time + * @param time time in milliseconds + */ + public void persistRegionEmpty(long time) { + pruneUpperBoundWriter.persistRegionEmpty(regionName, time); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time)); + } + } + + /** * Releases the usage {@link PruneUpperBoundWriter}. */ public void stop() { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index 897e00e..fc0ec76 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -19,7 +19,10 @@ package org.apache.tephra.hbase.txprune; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; @@ -35,6 +38,7 @@ import org.apache.tephra.txprune.RegionPruneInfo; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,11 +54,14 @@ import javax.annotation.Nullable; */ @SuppressWarnings("WeakerAccess") public class DataJanitorState { + private static final Log LOG = LogFactory.getLog(DataJanitorState.class); + public static final byte[] FAMILY = {'f'}; public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; private static final byte[] REGION_TIME_COL = {'r'}; private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'}; + private static final byte[] EMPTY_REGION_TIME_COL = {'e'}; private static final byte[] REGION_KEY_PREFIX = {0x1}; private static final byte[] REGION_KEY_PREFIX_STOP = {0x2}; @@ -65,7 +72,15 @@ public class DataJanitorState { private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3}; private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4}; + private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4}; + private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5}; + + private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5}; + private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6}; + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + // This value can be used when we don't care about the value we write in a column + private static final byte[] COL_VAL = Bytes.toBytes('1'); private final TableSupplier stateTableSupplier; @@ -148,7 +163,7 @@ public class DataJanitorState { for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); } - return resultMap; + return Collections.unmodifiableMap(resultMap); } /** @@ -181,7 +196,7 @@ public class DataJanitorState { } } } - return regionPruneInfos; + return Collections.unmodifiableList(regionPruneInfos); } /** @@ -223,7 +238,7 @@ public class DataJanitorState { // --------------------------------------------------- // ------- Methods for regions at a given time ------- // --------------------------------------------------- - // Key: 0x2<time><region-id> + // Key: 0x2<inverted time><region-id> // Col 't': <empty byte array> // --------------------------------------------------- @@ -240,12 +255,22 @@ public class DataJanitorState { try (Table stateTable = stateTableSupplier.get()) { for (byte[] region : regions) { Put put = new Put(makeTimeRegionKey(timeBytes, region)); - put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY); + put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL); stateTable.put(put); } + + // Save the count of regions as a checksum + saveRegionCountForTime(stateTable, timeBytes, regions.size()); } } + @VisibleForTesting + void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException { + Put put = new Put(makeTimeRegionCountKey(timeBytes)); + put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count)); + stateTable.put(put); + } + /** * Return the set of regions saved for the time at or before the given time. This method finds the greatest time * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are @@ -257,32 +282,58 @@ public class DataJanitorState { */ @Nullable public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { - byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); try (Table stateTable = stateTableSupplier.get()) { - Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); - scan.addColumn(FAMILY, REGION_TIME_COL); + TimeRegions timeRegions; + while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) { + int count = getRegionCountForTime(stateTable, timeRegions.getTime()); + if (count != -1 && count == timeRegions.getRegions().size()) { + return timeRegions; + } else { + LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s", + timeRegions.getTime(), count, timeRegions.getRegions().size())); + time = time - 1; + } + } + return null; + } + } - SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); - long currentRegionTime = -1; - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); - // Stop if reached next time value - if (currentRegionTime == -1) { - currentRegionTime = timeRegion.getKey(); - } else if (timeRegion.getKey() < currentRegionTime) { - break; - } else if (timeRegion.getKey() > currentRegionTime) { - throw new IllegalStateException( - String.format("Got out of order time %d when expecting time less than or equal to %d", - timeRegion.getKey(), currentRegionTime)); - } - regions.add(timeRegion.getValue()); + @Nullable + private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + + long currentRegionTime = -1; + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Result next; + try (ResultScanner scanner = stateTable.getScanner(scan)) { + while ((next = scanner.next()) != null) { + Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); + // Stop if reached next time value + if (currentRegionTime == -1) { + currentRegionTime = timeRegion.getKey(); + } else if (timeRegion.getKey() < currentRegionTime) { + break; + } else if (timeRegion.getKey() > currentRegionTime) { + throw new IllegalStateException( + String.format("Got out of order time %d when expecting time less than or equal to %d", + timeRegion.getKey(), currentRegionTime)); } + regions.add(timeRegion.getValue()); } - return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions); } + return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions)); + } + + @VisibleForTesting + int getRegionCountForTime(Table stateTable, long time) throws IOException { + Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, REGION_TIME_COL); + Result result = stateTable.get(get); + byte[] value = result.getValue(FAMILY, REGION_TIME_COL); + return value == null ? -1 : Bytes.toInt(value); } /** @@ -294,15 +345,15 @@ public class DataJanitorState { public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException { byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); try (Table stateTable = stateTableSupplier.get()) { + // Delete the regions Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); scan.addColumn(FAMILY, REGION_TIME_COL); + deleteFromScan(stateTable, scan); - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - stateTable.delete(new Delete(next.getRow())); - } - } + // Delete the count + scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + deleteFromScan(stateTable, scan); } } @@ -356,14 +407,82 @@ public class DataJanitorState { Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))), INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP); scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + // -------------------------------------------------------- + // ------- Methods for empty regions at a given time ------- + // -------------------------------------------------------- + // Key: 0x4<time><region-id> + // Col 'e': <empty byte array> + // -------------------------------------------------------- + + /** + * Save the given region as empty as of the given time. + * + * @param time time in milliseconds + * @param regionId region id + */ + public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException { + byte[] timeBytes = Bytes.toBytes(time); + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId)); + put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL); + stateTable.put(put); + } + } + + /** + * Return regions that were recorded as empty after the given time. + * + * @param time time in milliseconds + * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set + * and the empty regions after the given time + */ + public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions) + throws IOException { + SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY), + EMPTY_REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL); try (ResultScanner scanner = stateTable.getScanner(scan)) { Result next; while ((next = scanner.next()) != null) { - stateTable.delete(new Delete(next.getRow())); + byte[] emptyRegion = getEmptyRegionFromKey(next.getRow()); + if (includeRegions == null || includeRegions.contains(emptyRegion)) { + emptyRegions.add(emptyRegion); + } } } } + return Collections.unmodifiableSortedSet(emptyRegions); + } + + /** + * Delete empty region records saved on or before the given time. + * + * @param time time in milliseconds + */ + public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(); + scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY)); + scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + @VisibleForTesting + void deleteFromScan(Table stateTable, Scan scan) throws IOException { + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + stateTable.delete(new Delete(next.getRow())); + } + } } private byte[] makeRegionKey(byte[] regionId) { @@ -379,6 +498,10 @@ public class DataJanitorState { return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId); } + private byte[] makeTimeRegionCountKey(byte[] time) { + return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time); + } + private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) { return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time); } @@ -391,6 +514,15 @@ public class DataJanitorState { return Maps.immutableEntry(time, regionName); } + private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) { + return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId); + } + + private byte[] getEmptyRegionFromKey(byte[] key) { + int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG; + return Bytes.copy(key, prefixLen, key.length - prefixLen); + } + private long getInvertedTime(long time) { return Long.MAX_VALUE - time; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 99c514f..84c480a 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -46,6 +46,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; /** @@ -121,7 +122,7 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { public void initialize(Configuration conf) throws IOException { this.conf = conf; this.connection = ConnectionFactory.createConnection(conf); - + final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString()); @@ -200,6 +201,8 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime); LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime); dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime); + LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime); + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime); } @Override @@ -288,26 +291,40 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); long time = timeRegions.getTime(); - Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); + long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); + LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound); + // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions + if (inactiveTransactionBound == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + + "and hence the data must be incomplete", time); + } + continue; + } + + // Get the prune upper bounds for all the transactional regions + Map<byte[], Long> pruneUpperBoundRegions = + dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); logPruneUpperBoundRegions(pruneUpperBoundRegions); + + // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are + // recorded as empty after inactiveTransactionBoundTime will not have invalid data + // for transactions started on or before inactiveTransactionBoundTime + pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions, + pruneUpperBoundRegions); + // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound // across all regions - if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) { - long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); - LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time); - // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions - if (inactiveTransactionBound != -1) { - Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); - return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + - "and hence the data must be incomplete", time); - } - } + if (!transactionalRegions.isEmpty() && + pruneUpperBoundRegions.size() == transactionalRegions.size()) { + Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); + long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); + LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time); + return pruneUpperBound; } else { if (LOG.isDebugEnabled()) { - Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); + Sets.SetView<byte[]> difference = + Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}", time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN)); } @@ -318,6 +335,28 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, + SortedSet<byte[]> transactionalRegions, + Map<byte[], Long> pruneUpperBoundRegions) throws IOException { + long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound); + SortedSet<byte[]> emptyRegions = + dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions); + LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}", + inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + + // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data + // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound + // for these empty regions as inactiveTransactionBound + Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR); + pubWithEmptyRegions.putAll(pruneUpperBoundRegions); + for (byte[] emptyRegion : emptyRegions) { + if (!pruneUpperBoundRegions.containsKey(emptyRegion)) { + pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound); + } + } + return Collections.unmodifiableMap(pubWithEmptyRegions); + } + private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) { if (LOG.isDebugEnabled()) { LOG.debug("Got region - prune upper bound map: {}", http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 7e9d1a3..9773a15 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -39,25 +39,41 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final TableName tableName; private final DataJanitorState dataJanitorState; private final long pruneFlushInterval; + // Map of region name -> prune upper bound private final ConcurrentSkipListMap<byte[], Long> pruneEntries; + // Map of region name -> time the region was found to be empty + private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; private long lastChecked; + @SuppressWarnings("WeakerAccess") public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) { this.tableName = tableName; this.dataJanitorState = dataJanitorState; this.pruneFlushInterval = pruneFlushInterval; this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); } + @SuppressWarnings("WeakerAccess") public void persistPruneEntry(byte[] regionName, long pruneUpperBound) { + warnIfNotRunning(regionName); // The number of entries in this map is bound by the number of regions in this region server and thus it will not // grow indefinitely pruneEntries.put(regionName, pruneUpperBound); } + @SuppressWarnings("WeakerAccess") + public void persistRegionEmpty(byte[] regionName, long time) { + warnIfNotRunning(regionName); + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + emptyRegions.put(regionName, time); + } + + @SuppressWarnings("WeakerAccess") public boolean isAlive() { return flushThread != null && flushThread.isAlive(); } @@ -86,13 +102,22 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - while (pruneEntries.firstEntry() != null) { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); // We can now remove the entry only if the key and value match with what we wrote since it is // possible that a new pruneUpperBound for the same key has been added pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } } catch (IOException ex) { LOG.warn("Cannot record prune upper bound for a region to table " + tableName.getNameWithNamespaceInclAsString(), ex); @@ -115,4 +140,11 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread.setDaemon(true); flushThread.start(); } + + private void warnIfNotRunning(byte[] regionName) { + if (!isRunning() || !isAlive()) { + LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!", + Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running")); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java index 402892f..b96d87d 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java @@ -156,6 +156,7 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest { } // Verify saved regions + Assert.assertEquals(new TimeRegions(0, regionsTime.get(0L)), dataJanitorState.getRegionsOnOrBeforeTime(0)); Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31)); @@ -163,20 +164,39 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest { dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000)); Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10)); + // Now change the count stored for regions saved at time 0 and 30 + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE), 3); + dataJanitorState.saveRegionCountForTime(stateTable, Bytes.toBytes(Long.MAX_VALUE - 30L), 3); + } + // Now querying for time 0 should return null, and querying for time 30 should return regions from time 20 + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(0)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(35)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Delete regions saved on or before time 30 dataJanitorState.deleteAllRegionsOnOrBeforeTime(30); // Values on or before time 30 should be deleted Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30)); Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Counts should be deleted for time on or before 30 + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 30)); + Assert.assertEquals(-1, dataJanitorState.getRegionCountForTime(stateTable, 0)); + } // Values after time 30 should still exist Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40)); + try (Table stateTable = testUtil.getConnection().getTable(pruneStateTable)) { + Assert.assertEquals(5, dataJanitorState.getRegionCountForTime(stateTable, 40)); + } } @Test public void testSaveInactiveTransactionBoundTime() throws Exception { int maxTime = 100; - // Nothing sould be present in the beginning + // Nothing should be present in the beginning Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); // Save inactive transaction bounds for various time values @@ -202,4 +222,59 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest { Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30)); Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); } + + @Test + public void testSaveEmptyRegions() throws Exception { + // Nothing should be present in the beginning + Assert.assertEquals(ImmutableSortedSet.<byte[]>of(), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + + byte[] region1 = Bytes.toBytes("region1"); + byte[] region2 = Bytes.toBytes("region2"); + byte[] region3 = Bytes.toBytes("region3"); + byte[] region4 = Bytes.toBytes("region4"); + SortedSet<byte[]> allRegions = toISet(region1, region2, region3, region4); + + // Now record some empty regions + dataJanitorState.saveEmptyRegionForTime(100, region1); + dataJanitorState.saveEmptyRegionForTime(110, region1); + dataJanitorState.saveEmptyRegionForTime(102, region2); + dataJanitorState.saveEmptyRegionForTime(112, region3); + + Assert.assertEquals(toISet(region1, region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + + Assert.assertEquals(toISet(region1, region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(100, allRegions)); + + Assert.assertEquals(toISet(region2, region3), + dataJanitorState.getEmptyRegionsAfterTime(100, toISet(region2, region3))); + + Assert.assertEquals(toISet(), + dataJanitorState.getEmptyRegionsAfterTime(100, ImmutableSortedSet.<byte[]>of())); + + Assert.assertEquals(toISet(region3), + dataJanitorState.getEmptyRegionsAfterTime(110, allRegions)); + + Assert.assertEquals(toISet(), + dataJanitorState.getEmptyRegionsAfterTime(112, allRegions)); + + // Delete empty regions on or before time 110 + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(110); + // Now only region3 should remain + Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + Assert.assertEquals(toISet(region3), dataJanitorState.getEmptyRegionsAfterTime(100, allRegions)); + + // Delete empty regions on or before time 150 + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(150); + // Now nothing should remain + Assert.assertEquals(toISet(), dataJanitorState.getEmptyRegionsAfterTime(-1, null)); + } + + private ImmutableSortedSet<byte[]> toISet(byte[]... args) { + ImmutableSortedSet.Builder<byte[]> builder = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR); + for (byte[] arg : args) { + builder.add(arg); + } + return builder.build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/2af5ac2b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index a431ee3..07746d8 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -72,6 +72,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static TableName txDataTable1; private static TableName pruneStateTable; + private static DataJanitorState dataJanitorState; // Override AbstractHBaseTableTest.startMiniCluster to setup configuration @BeforeClass @@ -105,6 +106,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return testUtil.getConnection().getTable(pruneStateTable); + } + }); + } @AfterClass @@ -128,7 +137,12 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { @After public void afterTest() throws Exception { + // Disable the data table so that prune writer thread gets stopped, + // this makes sure that any cached value will not interfere with next test + hBaseAdmin.disableTable(txDataTable1); deletePruneStateTable(); + // Enabling the table enables the prune writer thread again + hBaseAdmin.enableTable(txDataTable1); } private void deletePruneStateTable() throws Exception { @@ -138,32 +152,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } - private void truncatePruneStateTable() throws Exception { - if (hBaseAdmin.tableExists(pruneStateTable)) { - if (hBaseAdmin.isTableEnabled(pruneStateTable)) { - hBaseAdmin.disableTable(pruneStateTable); - } - hBaseAdmin.truncateTable(pruneStateTable, true); - } - } - @Test public void testRecordCompactionState() throws Exception { - DataJanitorState dataJanitorState = - new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return testUtil.getConnection().getTable(pruneStateTable); - } - }); - - // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table - TimeUnit.SECONDS.sleep(2); - // Truncate prune state table to clear any data that might have been written by the previous test - // This is required because during the shutdown of the previous test, compaction might have kicked in and the - // coprocessor still had some data to flush and it might be flushed at the beginning of this test. - truncatePruneStateTable(); - // No prune upper bound initially Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); @@ -212,10 +202,6 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { @Test public void testRecordCompactionStateNoTable() throws Exception { - // To make sure we don't disrupt major compaction prune state table is not present, delete the prune state table - // and make sure a major compaction succeeds - deletePruneStateTable(); - // Create a new transaction snapshot InMemoryTransactionStateCache.setTransactionSnapshot( new TransactionSnapshot(100, 100, 100, ImmutableSet.of(50L), @@ -247,24 +233,9 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { @Test public void testPruneUpperBound() throws Exception { - DataJanitorState dataJanitorState = - new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return testUtil.getConnection().getTable(pruneStateTable); - } - }); - TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); transactionPruningPlugin.initialize(conf); - // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table - TimeUnit.SECONDS.sleep(2); - // Truncate prune state table to clear any data that might have been written by the previous test - // This is required because during the shutdown of the previous test, compaction might have kicked in and the - // coprocessor still had some data to flush and it might be flushed at the beginning of this test. - truncatePruneStateTable(); - try { // Run without a transaction snapshot first long now1 = 200; @@ -334,6 +305,87 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + @Test + public void testPruneEmptyTable() throws Exception { + // Make sure that empty tables do not block the progress of pruning + + // Create an empty table + TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable"); + HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + testUtil.compact(txEmptyTable, true); + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now flush the empty table, this will record the table region as empty, and then pruning will continue + hBaseAdmin.flush(txEmptyTable); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // fetch prune upper bound, again, this time it should work + pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1); + + // Now add some data to the empty table + // (adding data non-transactionally is okay too, we just need some data for the compaction to run) + emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1))); + emptyHTable.close(); + + // Now run another compaction on txDataTable1 with an updated tx snapshot + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS; + long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2, + ImmutableSet.of(expectedPruneUpperBound2), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + testUtil.flush(txEmptyTable); + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since + // txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being + // empty in the previous run with inactiveTxTimeNow1 + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + + // However, after compacting txEmptyTable we should get the latest upper bound + testUtil.flush(txEmptyTable); + testUtil.compact(txEmptyTable, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2); + } finally { + transactionPruningPlugin.destroy(); + hBaseAdmin.disableTable(txEmptyTable); + hBaseAdmin.deleteTable(txEmptyTable); + } + } + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row);
