Repository: incubator-tephra Updated Branches: refs/heads/master 36318c36d -> 79b97198c
TEPHRA-199 Transaction maximum lifetime This closes #22 Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/7c8267c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/7c8267c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/7c8267c4 Branch: refs/heads/master Commit: 7c8267c4a348004d92479466037c1de69886a442 Parents: 36318c3 Author: poorna <[email protected]> Authored: Mon Dec 5 00:25:16 2016 -0800 Committer: poorna <[email protected]> Committed: Wed Dec 28 16:08:10 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/tephra/TxConstants.java | 8 +++ .../tephra/hbase/TransactionAwareHTable.java | 1 + .../hbase/coprocessor/TransactionProcessor.java | 31 ++++++++++ .../hbase/TransactionAwareHTableTest.java | 61 ++++++++++++++++++++ 4 files changed, 101 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index 25451b3..bc02936 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -158,6 +158,14 @@ public class TxConstants { * The default value for the transaction timeout limit, in seconds: unlimited. */ public static final int DEFAULT_TX_MAX_TIMEOUT = Integer.MAX_VALUE; + /** + * The maximum time in seconds that a transaction can be used for data writes. + */ + public static final String CFG_TX_MAX_LIFETIME = "data.tx.max.lifetime"; + /** + * The default value for the maximum transaction lifetime. + */ + public static final int DEFAULT_TX_MAX_LIFETIME = (int) TimeUnit.HOURS.toSeconds(25); /** The frequency (in seconds) to perform periodic snapshots, or 0 for no periodic snapshots. */ public static final String CFG_TX_SNAPSHOT_INTERVAL = "data.tx.snapshot.interval"; /** Default value for frequency of periodic snapshots of transaction state. */ http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java index bb7afff..531e010 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java @@ -649,6 +649,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable txDelete.setAttribute(entry.getKey(), entry.getValue()); } txDelete.setDurability(delete.getDurability()); + addToOperation(txDelete, tx); return txDelete; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 9f723d6..132c157 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; @@ -67,6 +68,8 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; /** * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing @@ -106,6 +109,7 @@ public class TransactionProcessor extends BaseRegionObserver { protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; + protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -143,6 +147,10 @@ public class TransactionProcessor extends BaseRegionObserver { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + this.txMaxLifetimeMillis = + TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE); if (pruneEnabled) { @@ -179,6 +187,13 @@ public class TransactionProcessor extends BaseRegionObserver { } @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) + throws IOException { + Transaction tx = getFromOperation(put); + ensureValidTxLifetime(tx); + } + + @Override public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException { // Translate deletes into our own delete tombstones @@ -191,6 +206,9 @@ public class TransactionProcessor extends BaseRegionObserver { return; } + Transaction tx = getFromOperation(delete); + ensureValidTxLifetime(tx); + // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp()); @@ -341,6 +359,19 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } + private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + if (tx == null) { + return; + } + + boolean validLifetime = + TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + if (!validLifetime) { + throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", + tx.getTransactionId(), txMaxLifetimeMillis)); + } + } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || // to support old clients http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7c8267c4/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index c336712..46ac384 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -69,6 +69,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -1490,6 +1491,66 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { transactionContext.finish(); } + @Test + public void testTxLifetime() throws Exception { + // Add some initial values + transactionContext.start(); + Put put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value); + transactionAwareHTable.put(put); + put = new Put(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value); + transactionAwareHTable.put(put); + transactionContext.finish(); + + // Simulate writing with a transaction past its max lifetime + transactionContext.start(); + Transaction currentTx = transactionContext.getCurrentTransaction(); + Assert.assertNotNull(currentTx); + + // Create a transaction that is past the max lifetime + long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS); + Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId, + currentTx.getInvalids(), currentTx.getInProgress(), + currentTx.getFirstShortInProgress()); + transactionAwareHTable.updateTx(oldTx); + // Put with the old transaction should fail + put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier, TestBytes.value); + try { + transactionAwareHTable.put(put); + Assert.fail("Excepted exception with old transaction!"); + } catch (IOException e) { + // Expected exception + } + + // Delete with the old transaction should also fail + Delete delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier); + try { + transactionAwareHTable.delete(delete); + Assert.fail("Excepted exception with old transaction!"); + } catch (IOException e) { + // Expected exception + } + + // Now update the table to use the current transaction + transactionAwareHTable.updateTx(currentTx); + put = new Put(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2, TestBytes.value); + transactionAwareHTable.put(put); + delete = new Delete(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2); + transactionAwareHTable.delete(delete); + + // Verify values with the same transaction since we cannot commit the old transaction + verifyRow(transactionAwareHTable, + new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), TestBytes.value); + transactionContext.finish(); + } + /** * Tests that transaction co-processor works with older clients *
