Repository: incubator-tephra Updated Branches: refs/heads/master 92c61e7c4 -> 87cb21a0f
TEPHRA-210 Get table specific properties from tableDescriptor and other properties from getConfiguration method, construct compactionState lazily This closes #28 from GitHub. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/87cb21a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/87cb21a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/87cb21a0 Branch: refs/heads/master Commit: 87cb21a0fff5d1e5f39594c746788761c2341b21 Parents: 92c61e7 Author: Gokul Gunasekaran <[email protected]> Authored: Thu Jan 19 18:01:14 2017 -0800 Committer: Gokul Gunasekaran <[email protected]> Committed: Thu Jan 26 12:48:34 2017 -0800 ---------------------------------------------------------------------- .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++----- .../tephra/hbase/txprune/CompactionState.java | 4 +- .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++----- .../tephra/hbase/txprune/CompactionState.java | 4 +- .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++----- .../tephra/hbase/txprune/CompactionState.java | 4 +- .../hbase/coprocessor/TransactionProcessor.java | 103 ++++++++++++++----- .../tephra/hbase/txprune/CompactionState.java | 4 +- .../hbase/coprocessor/TransactionProcessor.java | 100 +++++++++++++----- .../tephra/hbase/txprune/CompactionState.java | 4 +- 10 files changed, 384 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 6e89571..c42dd64 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver { protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + protected Long txMaxLifetimeMillis; + private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver { ttlByFamily.put(columnDesc.getName(), ttl); } - this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, - TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); + this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } - - this.txMaxLifetimeMillis = - TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - - boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - if (pruneEnabled) { - String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + - pruneTable); - } } } + /** + * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default, + * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived + * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched + * from a HBase Table. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @return {@link Configuration}, can be null if it is not available + */ + @Nullable + protected Configuration getConfiguration(CoprocessorEnvironment env) { + return env.getConfiguration(); + } + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { Transaction tx = getFromOperation(put); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); } @Override @@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver { } Transaction tx = getFromOperation(delete); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. @@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver { List<Cell> familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } else { - int cellSize = familyCells.size(); - for (int i = 0; i < cellSize; i++) { - Cell cell = familyCells.get(i); + for (Cell cell : familyCells) { deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } } } @@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver { e.bypass(); } + private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) { + String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY); + Configuration conf = getConfiguration(env); + boolean allowEmptyValuesFromConfig = (conf != null) ? + conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) : + TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; + + // If the property is not present in the tableDescriptor, get it from the Configuration + return (allowEmptyValuesFromTableDesc != null) ? + Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - // Record tx state before the compaction - if (compactionState != null) { + if (pruneEnable == null) { + Configuration conf = getConfiguration(c.getEnvironment()); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable)); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } + } + + if (Boolean.TRUE.equals(pruneEnable)) { + // Record tx state before the compaction compactionState.record(request, snapshot); } + // Also make sure to use the same snapshot for the compaction return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } @@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } - private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + /** + * Make sure that the transaction is within the max valid transaction lifetime. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @param tx {@link Transaction} supplied by the + * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction + * IOException throw if the value of max lifetime of transaction is unavailable + */ + protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env, + @Nullable Transaction tx) throws IOException { if (tx == null) { return; } + if (txMaxLifetimeMillis == null) { + Configuration conf = getConfiguration(env); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } else { + throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + + "unavailable. Please retry the operation.")); + } + } + boolean validLifetime = TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); if (!validLifetime) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 1df754b..733f636 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -41,15 +41,13 @@ public class CompactionState { private final byte[] regionName; private final String regionNameAsString; private final TableName stateTable; - private final long txMaxLifetimeMills; private final DataJanitorState dataJanitorState; private volatile long pruneUpperBound = -1; - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) { + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { this.regionName = env.getRegion().getRegionName(); this.regionNameAsString = env.getRegion().getRegionNameAsString(); this.stateTable = stateTable; - this.txMaxLifetimeMills = txMaxLifetimeMills; this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public HTableInterface get() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 92862ad..19eb09e 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver { protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + protected Long txMaxLifetimeMillis; + private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver { ttlByFamily.put(columnDesc.getName(), ttl); } - this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, - TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); + this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } - - this.txMaxLifetimeMillis = - TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - - boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - if (pruneEnabled) { - String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + - pruneTable); - } } } + /** + * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default, + * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived + * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched + * from a HBase Table. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @return {@link Configuration}, can be null if it is not available + */ + @Nullable + protected Configuration getConfiguration(CoprocessorEnvironment env) { + return env.getConfiguration(); + } + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { Transaction tx = getFromOperation(put); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); } @Override @@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver { } Transaction tx = getFromOperation(delete); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. @@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver { List<Cell> familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } else { - int cellSize = familyCells.size(); - for (int i = 0; i < cellSize; i++) { - Cell cell = familyCells.get(i); + for (Cell cell : familyCells) { deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } } } @@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver { e.bypass(); } + private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) { + String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY); + Configuration conf = getConfiguration(env); + boolean allowEmptyValuesFromConfig = (conf != null) ? + conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) : + TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; + + // If the property is not present in the tableDescriptor, get it from the Configuration + return (allowEmptyValuesFromTableDesc != null) ? + Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - // Record tx state before the compaction - if (compactionState != null) { + if (pruneEnable == null) { + Configuration conf = getConfiguration(c.getEnvironment()); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable)); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } + } + + if (Boolean.TRUE.equals(pruneEnable)) { + // Record tx state before the compaction compactionState.record(request, snapshot); } + // Also make sure to use the same snapshot for the compaction return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } @@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } - private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + /** + * Make sure that the transaction is within the max valid transaction lifetime. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @param tx {@link Transaction} supplied by the + * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction + * IOException throw if the value of max lifetime of transaction is unavailable + */ + protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env, + @Nullable Transaction tx) throws IOException { if (tx == null) { return; } + if (txMaxLifetimeMillis == null) { + Configuration conf = getConfiguration(env); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } else { + throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + + "unavailable. Please retry the operation.")); + } + } + boolean validLifetime = TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); if (!validLifetime) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 53bd96d..38a79d6 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -41,15 +41,13 @@ public class CompactionState { private final byte[] regionName; private final String regionNameAsString; private final TableName stateTable; - private final long txMaxLifetimeMills; private final DataJanitorState dataJanitorState; private volatile long pruneUpperBound = -1; - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) { + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { this.regionName = env.getRegionInfo().getRegionName(); this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); this.stateTable = stateTable; - this.txMaxLifetimeMills = txMaxLifetimeMills; this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public HTableInterface get() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 744fa4c..19eb09e 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver { protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + protected Long txMaxLifetimeMillis; + private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver { ttlByFamily.put(columnDesc.getName(), ttl); } - this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, - TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); + this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } - - this.txMaxLifetimeMillis = - TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - - boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - if (pruneEnabled) { - String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + - pruneTable); - } } } + /** + * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default, + * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived + * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched + * from a HBase Table. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @return {@link Configuration}, can be null if it is not available + */ + @Nullable + protected Configuration getConfiguration(CoprocessorEnvironment env) { + return env.getConfiguration(); + } + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { Transaction tx = getFromOperation(put); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); } @Override @@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver { } Transaction tx = getFromOperation(delete); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. @@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver { List<Cell> familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } else { - int cellSize = familyCells.size(); - for (int i = 0; i < cellSize; i++) { - Cell cell = familyCells.get(i); + for (Cell cell : familyCells) { deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } } } @@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver { e.bypass(); } + private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) { + String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY); + Configuration conf = getConfiguration(env); + boolean allowEmptyValuesFromConfig = (conf != null) ? + conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) : + TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; + + // If the property is not present in the tableDescriptor, get it from the Configuration + return (allowEmptyValuesFromTableDesc != null) ? + Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - // Record tx state before the compaction - if (compactionState != null) { + if (pruneEnable == null) { + Configuration conf = getConfiguration(c.getEnvironment()); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable)); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } + } + + if (Boolean.TRUE.equals(pruneEnable)) { + // Record tx state before the compaction compactionState.record(request, snapshot); } + // Also make sure to use the same snapshot for the compaction return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } @@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } - private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + /** + * Make sure that the transaction is within the max valid transaction lifetime. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @param tx {@link Transaction} supplied by the + * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction + * IOException throw if the value of max lifetime of transaction is unavailable + */ + protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env, + @Nullable Transaction tx) throws IOException { if (tx == null) { return; } + if (txMaxLifetimeMillis == null) { + Configuration conf = getConfiguration(env); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } else { + throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + + "unavailable. Please retry the operation.")); + } + } + boolean validLifetime = TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); if (!validLifetime) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 2e61275..850f508 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -41,15 +41,13 @@ public class CompactionState { private final byte[] regionName; private final String regionNameAsString; private final TableName stateTable; - private final long txMaxLifetimeMills; private final DataJanitorState dataJanitorState; private volatile long pruneUpperBound = -1; - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) { + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { this.regionName = env.getRegionInfo().getRegionName(); this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); this.stateTable = stateTable; - this.txMaxLifetimeMills = txMaxLifetimeMills; this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 744fa4c..19eb09e 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -109,7 +110,8 @@ public class TransactionProcessor extends BaseRegionObserver { protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + protected Long txMaxLifetimeMillis; + private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver { ttlByFamily.put(columnDesc.getName(), ttl); } - this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, - TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); + this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } - - this.txMaxLifetimeMillis = - TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - - boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - if (pruneEnabled) { - String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + - pruneTable); - } } } + /** + * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default, + * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived + * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched + * from a HBase Table. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @return {@link Configuration}, can be null if it is not available + */ + @Nullable + protected Configuration getConfiguration(CoprocessorEnvironment env) { + return env.getConfiguration(); + } + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { Transaction tx = getFromOperation(put); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); } @Override @@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver { } Transaction tx = getFromOperation(delete); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. @@ -216,13 +217,11 @@ public class TransactionProcessor extends BaseRegionObserver { List<Cell> familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } else { - int cellSize = familyCells.size(); - for (int i = 0; i < cellSize; i++) { - Cell cell = familyCells.get(i); + for (Cell cell : familyCells) { deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } } } @@ -234,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver { e.bypass(); } + private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) { + String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY); + Configuration conf = getConfiguration(env); + boolean allowEmptyValuesFromConfig = (conf != null) ? + conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) : + TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; + + // If the property is not present in the tableDescriptor, get it from the Configuration + return (allowEmptyValuesFromTableDesc != null) ? + Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -304,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - // Record tx state before the compaction - if (compactionState != null) { + if (pruneEnable == null) { + Configuration conf = getConfiguration(c.getEnvironment()); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable)); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } + } + + if (Boolean.TRUE.equals(pruneEnable)) { + // Record tx state before the compaction compactionState.record(request, snapshot); } + // Also make sure to use the same snapshot for the compaction return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } @@ -359,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } - private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + /** + * Make sure that the transaction is within the max valid transaction lifetime. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @param tx {@link Transaction} supplied by the + * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction + * IOException throw if the value of max lifetime of transaction is unavailable + */ + protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env, + @Nullable Transaction tx) throws IOException { if (tx == null) { return; } + if (txMaxLifetimeMillis == null) { + Configuration conf = getConfiguration(env); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } else { + throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + + "unavailable. Please retry the operation.")); + } + } + boolean validLifetime = TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); if (!validLifetime) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 2e61275..850f508 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -41,15 +41,13 @@ public class CompactionState { private final byte[] regionName; private final String regionNameAsString; private final TableName stateTable; - private final long txMaxLifetimeMills; private final DataJanitorState dataJanitorState; private volatile long pruneUpperBound = -1; - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) { + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { this.regionName = env.getRegionInfo().getRegionName(); this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); this.stateTable = stateTable; - this.txMaxLifetimeMills = txMaxLifetimeMills; this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index ceffa4c..45eed50 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -103,13 +104,14 @@ import javax.annotation.Nullable; public class TransactionProcessor extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); - private TransactionStateCache cache; private final TransactionCodec txCodec; + private TransactionStateCache cache; private CompactionState compactionState; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); + protected Long txMaxLifetimeMillis; + private Boolean pruneEnable; public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -140,29 +142,28 @@ public class TransactionProcessor extends BaseRegionObserver { ttlByFamily.put(columnDesc.getName(), ttl); } - this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, - TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); + this.allowEmptyValues = getAllowEmptyValues(env, tableDesc); this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } - - this.txMaxLifetimeMillis = - TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, - TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - - boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - if (pruneEnabled) { - String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + - pruneTable); - } } } + /** + * Fetch the {@link Configuration} that contains the properties required by the coprocessor. By default, + * the HBase configuration is returned. This method will never return {@code null} in Tephra but the derived + * classes might do so if {@link Configuration} is not available temporarily (for example, if it is being fetched + * from a HBase Table. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @return {@link Configuration}, can be null if it is not available + */ + @Nullable + protected Configuration getConfiguration(CoprocessorEnvironment env) { + return env.getConfiguration(); + } + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @@ -190,7 +191,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { Transaction tx = getFromOperation(put); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); } @Override @@ -207,7 +208,7 @@ public class TransactionProcessor extends BaseRegionObserver { } Transaction tx = getFromOperation(delete); - ensureValidTxLifetime(tx); + ensureValidTxLifetime(e.getEnvironment(), tx); // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. @@ -216,9 +217,8 @@ public class TransactionProcessor extends BaseRegionObserver { List<Cell> familyCells = delete.getFamilyCellMap().get(family); if (isFamilyDelete(familyCells)) { deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); + HConstants.EMPTY_BYTE_ARRAY); } else { - int cellSize = familyCells.size(); for (Cell cell : familyCells) { deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), HConstants.EMPTY_BYTE_ARRAY); @@ -233,6 +233,18 @@ public class TransactionProcessor extends BaseRegionObserver { e.bypass(); } + private boolean getAllowEmptyValues(RegionCoprocessorEnvironment env, HTableDescriptor htd) { + String allowEmptyValuesFromTableDesc = htd.getValue(TxConstants.ALLOW_EMPTY_VALUES_KEY); + Configuration conf = getConfiguration(env); + boolean allowEmptyValuesFromConfig = (conf != null) ? + conf.getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, TxConstants.ALLOW_EMPTY_VALUES_DEFAULT) : + TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; + + // If the property is not present in the tableDescriptor, get it from the Configuration + return (allowEmptyValuesFromTableDesc != null) ? + Boolean.valueOf(allowEmptyValuesFromTableDesc) : allowEmptyValuesFromConfig; + } + private boolean isFamilyDelete(List<Cell> familyCells) { return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); } @@ -303,10 +315,26 @@ public class TransactionProcessor extends BaseRegionObserver { // Get the latest tx snapshot state for the compaction TransactionVisibilityState snapshot = cache.getLatestState(); - // Record tx state before the compaction - if (compactionState != null) { + if (pruneEnable == null) { + Configuration conf = getConfiguration(c.getEnvironment()); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable)); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } + } + + if (Boolean.TRUE.equals(pruneEnable)) { + // Record tx state before the compaction compactionState.record(request, snapshot); } + // Also make sure to use the same snapshot for the compaction return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); } @@ -358,11 +386,33 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } - private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + /** + * Make sure that the transaction is within the max valid transaction lifetime. + * + * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @param tx {@link Transaction} supplied by the + * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction + * IOException throw if the value of max lifetime of transaction is unavailable + */ + protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env, + @Nullable Transaction tx) throws IOException { if (tx == null) { return; } + if (txMaxLifetimeMillis == null) { + Configuration conf = getConfiguration(env); + // Configuration won't be null in TransactionProcessor but the derived classes might return + // null if it is not available temporarily + if (conf != null) { + this.txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + } else { + throw new IOException(String.format("Could not validate Transaction since the value of max lifetime is " + + "unavailable. Please retry the operation.")); + } + } + boolean validLifetime = TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); if (!validLifetime) { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/87cb21a0/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 2e61275..850f508 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -41,15 +41,13 @@ public class CompactionState { private final byte[] regionName; private final String regionNameAsString; private final TableName stateTable; - private final long txMaxLifetimeMills; private final DataJanitorState dataJanitorState; private volatile long pruneUpperBound = -1; - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) { + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { this.regionName = env.getRegionInfo().getRegionName(); this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); this.stateTable = stateTable; - this.txMaxLifetimeMills = txMaxLifetimeMills; this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException {
