TEPHRA-224 Handle delay between transaction max lifetime check and data writes while pruning
This closes #38 Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/808ed2e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/808ed2e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/808ed2e3 Branch: refs/heads/master Commit: 808ed2e3fe40c86cf05442e0d842bbc844ddd857 Parents: 872fb10 Author: poorna <[email protected]> Authored: Tue Feb 21 17:15:20 2017 -0800 Committer: poorna <[email protected]> Committed: Wed Feb 22 14:49:16 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/tephra/TxConstants.java | 6 ++++++ .../tephra/txprune/TransactionPruningRunnable.java | 12 ++++++++++-- .../tephra/txprune/TransactionPruningService.java | 9 ++++++--- .../tephra/txprune/TransactionPruningServiceTest.java | 10 ++++++---- 4 files changed, 28 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index ebf91e3..26a48fb 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -376,6 +376,11 @@ public class TxConstants { public static final String PRUNE_FLUSH_INTERVAL = "data.tx.prune.flush.interval"; /** + * The time in seconds used to pad transaction max lifetime while pruning. + */ + public static final String PRUNE_GRACE_PERIOD = "data.tx.grace.period"; + + /** * Comma separated list of invalid transaction pruning plugins to load */ public static final String PLUGINS = "data.tx.prune.plugins"; @@ -388,6 +393,7 @@ public class TxConstants { public static final String DEFAULT_PRUNE_STATE_TABLE = "tephra.state"; public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6); public static final long DEFAULT_PRUNE_FLUSH_INTERVAL = TimeUnit.MINUTES.toSeconds(1); + public static final long DEFAULT_PRUNE_GRACE_PERIOD = TimeUnit.HOURS.toSeconds(24); public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default"; public static final String DEFAULT_PLUGIN_CLASS = "org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin"; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java index d73c50a..89ed25e 100644 --- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java @@ -36,18 +36,21 @@ import java.util.TreeSet; * This class executes one run of transaction pruning every time it is invoked. * Typically, this class will be scheduled to run periodically. */ +@SuppressWarnings("WeakerAccess") public class TransactionPruningRunnable implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningRunnable.class); private final TransactionManager txManager; private final Map<String, TransactionPruningPlugin> plugins; private final long txMaxLifetimeMillis; + private final long txPruneBufferMillis; public TransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, - long txMaxLifetimeMillis) { + long txMaxLifetimeMillis, long txPruneBufferMillis) { this.txManager = txManager; this.plugins = plugins; this.txMaxLifetimeMillis = txMaxLifetimeMillis; + this.txPruneBufferMillis = txPruneBufferMillis; } @Override @@ -57,8 +60,13 @@ public class TransactionPruningRunnable implements Runnable { Transaction tx = txManager.startShort(); txManager.abort(tx); + if (tx.getInvalids().length == 0) { + LOG.info("Invalid list is empty, not running transaction pruning"); + return; + } + long now = getTime(); - long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis); + long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis + txPruneBufferMillis); LOG.info("Starting invalid prune run for time {} and inactive transaction bound {}", now, inactiveTransactionBound); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java index d80bbd4..8d7fe2f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java @@ -78,8 +78,11 @@ public class TransactionPruningService extends AbstractIdleService { Map<String, TransactionPruningPlugin> plugins = initializePlugins(); long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + long txPruneBufferMillis = + TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, + TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD)); scheduledExecutorService.scheduleAtFixedRate( - getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis), + getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis), scheduleInterval, scheduleInterval, TimeUnit.SECONDS); LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval); } @@ -104,8 +107,8 @@ public class TransactionPruningService extends AbstractIdleService { @VisibleForTesting TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, - long txMaxLifetimeMillis) { - return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis); + long txMaxLifetimeMillis, long txPruneBufferMillis) { + return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis); } private Map<String, TransactionPruningPlugin> initializePlugins() http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/808ed2e3/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java index 9c23ab7..2a0a17e 100644 --- a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java @@ -69,6 +69,7 @@ public class TransactionPruningServiceTest { conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1); conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10); + conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0); // Setup mock data long m = 1000; @@ -132,6 +133,7 @@ public class TransactionPruningServiceTest { conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1); conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10); + conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0); // Setup mock data long m = 1000; @@ -222,8 +224,8 @@ public class TransactionPruningServiceTest { @Override TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, - long txMaxLifetimeMillis) { - return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis); + long txMaxLifetimeMillis, long txPruneBufferMillis) { + return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis); } } @@ -233,8 +235,8 @@ public class TransactionPruningServiceTest { private static class TestTransactionPruningRunnable extends TransactionPruningRunnable { private static Iterator<Long> currentTime; TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, - long txMaxLifetimeMillis) { - super(txManager, plugins, txMaxLifetimeMillis); + long txMaxLifetimeMillis, long txPruneBufferMillis) { + super(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis); } @Override
