Repository: incubator-tephra Updated Branches: refs/heads/master 2af5ac2bd -> 203825e29
TEPHRA-219 execute cross region calls in coprocessor as the login user. This closes #35 from GitHub. Make pruneEnable and txMaxLifetimeMillis volatile so that derived classes can make use of it. Introduced stopped variable in PruneUpperBoundWriter. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/203825e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/203825e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/203825e2 Branch: refs/heads/master Commit: 203825e29af43b24029ef83dc512a4b8a3deeadc Parents: 2af5ac2 Author: Gokul Gunasekaran <[email protected]> Authored: Sat Feb 11 19:42:02 2017 -0800 Committer: Gokul Gunasekaran <[email protected]> Committed: Mon Feb 13 21:40:11 2017 -0800 ---------------------------------------------------------------------- .../hbase/coprocessor/TransactionProcessor.java | 95 ++++++++++--------- .../hbase/txprune/PruneUpperBoundWriter.java | 58 ++++++++---- .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++--------- .../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++---- .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++--------- .../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++---- .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++--------- .../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++---- .../hbase/coprocessor/TransactionProcessor.java | 97 +++++++++++--------- .../hbase/txprune/PruneUpperBoundWriter.java | 44 +++++---- 10 files changed, 405 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 9ff4d3b..d2402a6 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; - private CompactionState compactionState; + private volatile CompactionState compactionState; + + protected volatile Boolean pruneEnable; + protected volatile Long txMaxLifetimeMillis; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected Long txMaxLifetimeMillis; - private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver { } this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); + this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + initializePruneState(env); } } @@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void stop(CoprocessorEnvironment e) throws IOException { - if (compactionState != null) { - compactionState.stop(); - } + resetPruneState(); } @Override @@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver { Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; } + private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); + if (conf != null) { + return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } + return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver { LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles)); if (memstoreSize == 0 && numStoreFiles == 0) { - if (pruneEnable == null) { - initPruneState(e); - } - - if (Boolean.TRUE.equals(pruneEnable)) { + if (compactionState != null) { compactionState.persistRegionEmpty(System.currentTimeMillis()); } } - } @Override @@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - if (pruneEnable == null) { - initPruneState(c); - } - - if (Boolean.TRUE.equals(pruneEnable)) { - // Record tx state before the compaction + // Record tx state before the compaction + if (compactionState != null) { compactionState.record(request, snapshot); } @@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver { return; } - if (txMaxLifetimeMillis == null) { - Configuration conf = getConfiguration(env); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily - if (conf != null) { - this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - } else { - throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + - "unavailable. Please retry the operation.")); - } - } - boolean validLifetime = - TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis(); if (!validLifetime) { throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", tx.getTransactionId(), txMaxLifetimeMillis)); @@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver { return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } - private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) { - Configuration conf = getConfiguration(c.getEnvironment()); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily + /** + * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the + * prune related properties after clearing the state by calling {@link #resetPruneState}. + * + * @param env {@link RegionCoprocessorEnvironment} of this region + */ + protected void initializePruneState(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); if (conf != null) { pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (Boolean.TRUE.equals(pruneEnable)) { - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - long pruneFlushInterval = TimeUnit.SECONDS.toMillis( - conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, - TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong( + TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + + compactionState = new CompactionState(env, pruneTable, pruneFlushInterval); if (LOG.isDebugEnabled()) { - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); + TableName name = env.getRegion().getRegionInfo().getTable(); + LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " + + "be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(), + pruneTable.getNamespaceAsString(), pruneTable.getNameAsString())); } } } } + /** + * Stop and clear state related to pruning. + */ + protected void resetPruneState() { + pruneEnable = false; + if (compactionState != null) { + compactionState.stop(); + compactionState = null; + } + } + private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) { long numStoreFiles = 0; for (Store store : c.getEnvironment().getRegion().getStores().values()) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index beed1ad..7e4a0fa 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -23,8 +23,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; @@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; + private volatile boolean stopped; private long lastChecked; @@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { @Override protected void shutDown() throws Exception { LOG.info("Stopping PruneUpperBoundWriter Thread."); + stopped = true; if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); @@ -97,30 +101,36 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while ((!isInterrupted()) && isRunning()) { + while ((!isInterrupted()) && (!stopped)) { long now = System.currentTimeMillis(); if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - // Record prune upper bound - while (!pruneEntries.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); - dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new pruneUpperBound for the same key has been added - pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); - } - // Record empty regions - while (!emptyRegions.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); - dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new value for the same key has been added - emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); - } - } catch (IOException ex) { - LOG.warn("Cannot record prune upper bound for a region to table " + - tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex); + UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); + } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } + return null; + } + }); + } catch (IOException | InterruptedException ex) { + // Handle any exception that might be thrown during HBase operation + handleException(ex); } lastChecked = now; } @@ -147,4 +157,12 @@ public class PruneUpperBoundWriter extends AbstractIdleService { Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running")); } } + + private void handleException(Exception ex) { + LOG.warn("Cannot record prune upper bound for a region to table " + + tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex); + if (ex instanceof IOException) { + Thread.currentThread().interrupt(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 7485b91..84776cf 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; - private CompactionState compactionState; + private volatile CompactionState compactionState; + + protected volatile Boolean pruneEnable; + protected volatile Long txMaxLifetimeMillis; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected Long txMaxLifetimeMillis; - private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver { } this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); + this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + initializePruneState(env); } } @@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void stop(CoprocessorEnvironment e) throws IOException { - if (compactionState != null) { - compactionState.stop(); - } + resetPruneState(); } @Override @@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver { Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; } + private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); + if (conf != null) { + return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } + return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver { LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles)); if (memstoreSize == 0 && numStoreFiles == 0) { - if (pruneEnable == null) { - initPruneState(e); - } - - if (Boolean.TRUE.equals(pruneEnable)) { + if (compactionState != null) { compactionState.persistRegionEmpty(System.currentTimeMillis()); } } - } @Override @@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - if (pruneEnable == null) { - initPruneState(c); - } - - if (Boolean.TRUE.equals(pruneEnable)) { - // Record tx state before the compaction + // Record tx state before the compaction + if (compactionState != null) { compactionState.record(request, snapshot); } @@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile, CompactionRequest request) throws IOException { - // Persist the compaction state after a succesful compaction + // Persist the compaction state after a successful compaction if (compactionState != null) { compactionState.persist(); } @@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver { return; } - if (txMaxLifetimeMillis == null) { - Configuration conf = getConfiguration(env); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily - if (conf != null) { - this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - } else { - throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + - "unavailable. Please retry the operation.")); - } - } - boolean validLifetime = - TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis(); if (!validLifetime) { throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", tx.getTransactionId(), txMaxLifetimeMillis)); @@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver { return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } - private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) { - Configuration conf = getConfiguration(c.getEnvironment()); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily + /** + * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the + * prune related properties after clearing the state by calling {@link #resetPruneState}. + * + * @param env {@link RegionCoprocessorEnvironment} of this region + */ + protected void initializePruneState(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); if (conf != null) { pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (Boolean.TRUE.equals(pruneEnable)) { - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - long pruneFlushInterval = TimeUnit.SECONDS.toMillis( - conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, - TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong( + TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + + compactionState = new CompactionState(env, pruneTable, pruneFlushInterval); if (LOG.isDebugEnabled()) { - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); + TableName name = env.getRegion().getRegionInfo().getTable(); + LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " + + "be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(), + pruneTable.getNamespaceAsString(), pruneTable.getNameAsString())); } } } } + /** + * Stop and clear state related to pruning. + */ + protected void resetPruneState() { + pruneEnable = false; + if (compactionState != null) { + compactionState.stop(); + compactionState = null; + } + } + private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) { long numStoreFiles = 0; for (Store store : c.getEnvironment().getRegion().getStores().values()) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index beed1ad..38c1a6f 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; @@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; + private volatile boolean stopped; private long lastChecked; @@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { @Override protected void shutDown() throws Exception { LOG.info("Stopping PruneUpperBoundWriter Thread."); + stopped = true; if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); @@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while ((!isInterrupted()) && isRunning()) { + while ((!isInterrupted()) && (!stopped)) { long now = System.currentTimeMillis(); if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - // Record prune upper bound - while (!pruneEntries.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); - dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new pruneUpperBound for the same key has been added - pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); - } - // Record empty regions - while (!emptyRegions.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); - dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new value for the same key has been added - emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); - } + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); + } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } + return null; + } + }); } catch (IOException ex) { LOG.warn("Cannot record prune upper bound for a region to table " + tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 7485b91..b73bdc1 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; - private CompactionState compactionState; + private volatile CompactionState compactionState; + + protected volatile Boolean pruneEnable; + protected volatile Long txMaxLifetimeMillis; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected Long txMaxLifetimeMillis; - private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver { } this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); + this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + initializePruneState(env); } } @@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void stop(CoprocessorEnvironment e) throws IOException { - if (compactionState != null) { - compactionState.stop(); - } + resetPruneState(); } @Override @@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver { Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; } + private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); + if (conf != null) { + return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } + return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver { LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles)); if (memstoreSize == 0 && numStoreFiles == 0) { - if (pruneEnable == null) { - initPruneState(e); - } - - if (Boolean.TRUE.equals(pruneEnable)) { + if (compactionState != null) { compactionState.persistRegionEmpty(System.currentTimeMillis()); } } - } @Override @@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - if (pruneEnable == null) { - initPruneState(c); - } - - if (Boolean.TRUE.equals(pruneEnable)) { - // Record tx state before the compaction + // Record tx state before the compaction + if (compactionState != null) { compactionState.record(request, snapshot); } @@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile, CompactionRequest request) throws IOException { - // Persist the compaction state after a succesful compaction + // Persist the compaction state after a successful compaction if (compactionState != null) { compactionState.persist(); } @@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver { return; } - if (txMaxLifetimeMillis == null) { - Configuration conf = getConfiguration(env); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily - if (conf != null) { - this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - } else { - throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + - "unavailable. Please retry the operation.")); - } - } - boolean validLifetime = - TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis(); if (!validLifetime) { throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", tx.getTransactionId(), txMaxLifetimeMillis)); @@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver { return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } - private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) { - Configuration conf = getConfiguration(c.getEnvironment()); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily + /** + * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the + * prune related properties after clearing the state by calling {@link #resetPruneState}. + * + * @param env {@link RegionCoprocessorEnvironment} of this region + */ + protected void initializePruneState(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); if (conf != null) { pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (Boolean.TRUE.equals(pruneEnable)) { - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - long pruneFlushInterval = TimeUnit.SECONDS.toMillis( - conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, - TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong( + TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + + compactionState = new CompactionState(env, pruneTable, pruneFlushInterval); if (LOG.isDebugEnabled()) { - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); + LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " + + "will be recorded in table %s", + env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(), + pruneTable.getNameWithNamespaceInclAsString())); } } } } + /** + * Stop and clear state related to pruning. + */ + protected void resetPruneState() { + pruneEnable = false; + if (compactionState != null) { + compactionState.stop(); + compactionState = null; + } + } + private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) { long numStoreFiles = 0; for (Store store : c.getEnvironment().getRegion().getStores().values()) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 9773a15..6bd8bab 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; @@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; + private volatile boolean stopped; private long lastChecked; @@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { @Override protected void shutDown() throws Exception { LOG.info("Stopping PruneUpperBoundWriter Thread."); + stopped = true; if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); @@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while ((!isInterrupted()) && isRunning()) { + while ((!isInterrupted()) && (!stopped)) { long now = System.currentTimeMillis(); if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - // Record prune upper bound - while (!pruneEntries.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); - dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new pruneUpperBound for the same key has been added - pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); - } - // Record empty regions - while (!emptyRegions.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); - dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new value for the same key has been added - emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); - } + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); + } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } + return null; + } + }); } catch (IOException ex) { LOG.warn("Cannot record prune upper bound for a region to table " + tableName.getNameWithNamespaceInclAsString(), ex); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 7485b91..f9bb35e 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; - private CompactionState compactionState; + private volatile CompactionState compactionState; + + protected volatile Boolean pruneEnable; + protected volatile Long txMaxLifetimeMillis; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected Long txMaxLifetimeMillis; - private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver { } this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); + this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + initializePruneState(env); } } @@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void stop(CoprocessorEnvironment e) throws IOException { - if (compactionState != null) { - compactionState.stop(); - } + resetPruneState(); } @Override @@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver { Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; } + private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); + if (conf != null) { + return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } + return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver { LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles)); if (memstoreSize == 0 && numStoreFiles == 0) { - if (pruneEnable == null) { - initPruneState(e); - } - - if (Boolean.TRUE.equals(pruneEnable)) { + if (compactionState != null) { compactionState.persistRegionEmpty(System.currentTimeMillis()); } } - } @Override @@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - if (pruneEnable == null) { - initPruneState(c); - } - - if (Boolean.TRUE.equals(pruneEnable)) { - // Record tx state before the compaction + // Record tx state before the compaction + if (compactionState != null) { compactionState.record(request, snapshot); } @@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile, CompactionRequest request) throws IOException { - // Persist the compaction state after a succesful compaction + // Persist the compaction state after a successful compaction if (compactionState != null) { compactionState.persist(); } @@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver { return; } - if (txMaxLifetimeMillis == null) { - Configuration conf = getConfiguration(env); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily - if (conf != null) { - this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - } else { - throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + - "unavailable. Please retry the operation.")); - } - } - boolean validLifetime = - TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis(); if (!validLifetime) { throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", tx.getTransactionId(), txMaxLifetimeMillis)); @@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver { return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } - private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) { - Configuration conf = getConfiguration(c.getEnvironment()); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily + /** + * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the + * prune related properties after clearing the state by calling {@link #resetPruneState}. + * + * @param env {@link RegionCoprocessorEnvironment} of this region + */ + protected void initializePruneState(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); if (conf != null) { pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (Boolean.TRUE.equals(pruneEnable)) { - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - long pruneFlushInterval = TimeUnit.SECONDS.toMillis( - conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, - TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong( + TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + + compactionState = new CompactionState(env, pruneTable, pruneFlushInterval); if (LOG.isDebugEnabled()) { - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); + LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " + + "be recorded in table %s:%s", env.getRegionInfo().getTable().getNamespaceAsString(), + env.getRegionInfo().getTable().getNameAsString(), pruneTable.getNamespaceAsString(), + pruneTable.getNameAsString())); } } } } + /** + * Stop and clear state related to pruning. + */ + protected void resetPruneState() { + pruneEnable = false; + if (compactionState != null) { + compactionState.stop(); + compactionState = null; + } + } + private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) { long numStoreFiles = 0; for (Store store : c.getEnvironment().getRegion().getStores().values()) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index beed1ad..38c1a6f 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; @@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; + private volatile boolean stopped; private long lastChecked; @@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { @Override protected void shutDown() throws Exception { LOG.info("Stopping PruneUpperBoundWriter Thread."); + stopped = true; if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); @@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while ((!isInterrupted()) && isRunning()) { + while ((!isInterrupted()) && (!stopped)) { long now = System.currentTimeMillis(); if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - // Record prune upper bound - while (!pruneEntries.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); - dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new pruneUpperBound for the same key has been added - pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); - } - // Record empty regions - while (!emptyRegions.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); - dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new value for the same key has been added - emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); - } + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); + } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } + return null; + } + }); } catch (IOException ex) { LOG.warn("Cannot record prune upper bound for a region to table " + tableName.getNamespaceAsString() + ":" + tableName.getNameAsString(), ex); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 5e1b4c5..02e2dac 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -107,12 +107,13 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; - private CompactionState compactionState; + private volatile CompactionState compactionState; + + protected volatile Boolean pruneEnable; + protected volatile Long txMaxLifetimeMillis; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected Long txMaxLifetimeMillis; - private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -144,10 +145,12 @@ public class TransactionProcessor extends BaseRegionObserver { } this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); + this.txMaxLifetimeMillis = getTxMaxLifetimeMillis(env); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + initializePruneState(env); } } @@ -171,9 +174,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void stop(CoprocessorEnvironment e) throws IOException { - if (compactionState != null) { - compactionState.stop(); - } + resetPruneState(); } @Override @@ -248,6 +249,15 @@ public class TransactionProcessor extends BaseRegionObserver { Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; } + private long getTxMaxLifetimeMillis(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); + if (conf != null) { + return TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } + return TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -321,15 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver { LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s", region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles)); if (memstoreSize == 0 && numStoreFiles == 0) { - if (pruneEnable == null) { - initPruneState(e); - } - - if (Boolean.TRUE.equals(pruneEnable)) { + if (compactionState != null) { compactionState.persistRegionEmpty(System.currentTimeMillis()); } } - } @Override @@ -340,12 +345,8 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - if (pruneEnable == null) { - initPruneState(c); - } - - if (Boolean.TRUE.equals(pruneEnable)) { - // Record tx state before the compaction + // Record tx state before the compaction + if (compactionState != null) { compactionState.record(request, snapshot); } @@ -356,7 +357,7 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile, CompactionRequest request) throws IOException { - // Persist the compaction state after a succesful compaction + // Persist the compaction state after a successful compaction if (compactionState != null) { compactionState.persist(); } @@ -416,21 +417,8 @@ public class TransactionProcessor extends BaseRegionObserver { return; } - if (txMaxLifetimeMillis == null) { - Configuration conf = getConfiguration(env); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily - if (conf != null) { - this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - } else { - throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + - "unavailable. Please retry the operation.")); - } - } - boolean validLifetime = - TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + (TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis) > System.currentTimeMillis(); if (!validLifetime) { throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", tx.getTransactionId(), txMaxLifetimeMillis)); @@ -454,28 +442,47 @@ public class TransactionProcessor extends BaseRegionObserver { return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } - private void initPruneState(ObserverContext<RegionCoprocessorEnvironment> c) { - Configuration conf = getConfiguration(c.getEnvironment()); - // Configuration won't be null in TransactionProcessor but the derived classes might return - // null if it is not available temporarily + /** + * Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the + * prune related properties after clearing the state by calling {@link #resetPruneState}. + * + * @param env {@link RegionCoprocessorEnvironment} of this region + */ + protected void initializePruneState(RegionCoprocessorEnvironment env) { + Configuration conf = getConfiguration(env); if (conf != null) { pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (Boolean.TRUE.equals(pruneEnable)) { - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - long pruneFlushInterval = TimeUnit.SECONDS.toMillis( - conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, - TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong( + TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + + compactionState = new CompactionState(env, pruneTable, pruneFlushInterval); if (LOG.isDebugEnabled()) { - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); + LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " + + "will be recorded in table %s", + env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(), + pruneTable.getNameWithNamespaceInclAsString())); } } } } + /** + * Stop and clear state related to pruning. + */ + protected void resetPruneState() { + pruneEnable = false; + if (compactionState != null) { + compactionState.stop(); + compactionState = null; + } + } + private long numStoreFilesForRegion(ObserverContext<RegionCoprocessorEnvironment> c) { long numStoreFiles = 0; for (Store store : c.getEnvironment().getRegion().getStores()) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/203825e2/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 9773a15..6bd8bab 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -22,9 +22,11 @@ import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; @@ -45,6 +47,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { private final ConcurrentSkipListMap<byte[], Long> emptyRegions; private volatile Thread flushThread; + private volatile boolean stopped; private long lastChecked; @@ -87,6 +90,7 @@ public class PruneUpperBoundWriter extends AbstractIdleService { @Override protected void shutDown() throws Exception { LOG.info("Stopping PruneUpperBoundWriter Thread."); + stopped = true; if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); @@ -97,27 +101,33 @@ public class PruneUpperBoundWriter extends AbstractIdleService { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while ((!isInterrupted()) && isRunning()) { + while ((!isInterrupted()) && (!stopped)) { long now = System.currentTimeMillis(); if (now > (lastChecked + pruneFlushInterval)) { // should flush data try { - // Record prune upper bound - while (!pruneEntries.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); - dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new pruneUpperBound for the same key has been added - pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); - } - // Record empty regions - while (!emptyRegions.isEmpty()) { - Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); - dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); - // We can now remove the entry only if the key and value match with what we wrote since it is - // possible that a new value for the same key has been added - emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); - } + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); + } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } + return null; + } + }); } catch (IOException ex) { LOG.warn("Cannot record prune upper bound for a region to table " + tableName.getNameWithNamespaceInclAsString(), ex);
