Support old clients on operation attribute name change
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/e0a6e0a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/e0a6e0a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/e0a6e0a5 Branch: refs/heads/master Commit: e0a6e0a569789afc2872a3a247976b7d08aa463c Parents: 6cc3adb Author: poorna <[email protected]> Authored: Tue May 10 21:46:43 2016 -0700 Committer: poorna <[email protected]> Committed: Tue May 10 21:46:43 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/tephra/TxConstants.java | 8 +++ .../tephra/hbase96/TransactionAwareHTable.java | 6 +- .../coprocessor/TransactionProcessor.java | 12 +++- .../hbase96/TransactionAwareHTableTest.java | 62 ++++++++++++++++++++ .../tephra/hbase98/TransactionAwareHTable.java | 6 +- .../coprocessor/TransactionProcessor.java | 12 +++- .../hbase98/TransactionAwareHTableTest.java | 62 ++++++++++++++++++++ .../hbase10cdh/TransactionAwareHTable.java | 6 +- .../coprocessor/TransactionProcessor.java | 12 +++- .../hbase10cdh/TransactionAwareHTableTest.java | 62 ++++++++++++++++++++ .../tephra/hbase10/TransactionAwareHTable.java | 6 +- .../coprocessor/TransactionProcessor.java | 12 +++- .../hbase10/TransactionAwareHTableTest.java | 62 ++++++++++++++++++++ .../tephra/hbase11/TransactionAwareHTable.java | 6 +- .../coprocessor/TransactionProcessor.java | 12 +++- .../hbase11/TransactionAwareHTableTest.java | 62 ++++++++++++++++++++ 16 files changed, 398 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index d962486..61ee3cc 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -88,11 +88,19 @@ public class TxConstants { */ public static final String TX_OPERATION_ATTRIBUTE_KEY = "tephra.tx"; /** + * @deprecated This constant is replaced by {@link #TX_OPERATION_ATTRIBUTE_KEY} + */ + public static final String OLD_TX_OPERATION_ATTRIBUTE_KEY = "cask.tx"; + /** * Key used to flag a delete operation as part of a transaction rollback. This is used so that the * {@code TransactionProcessor} coprocessor loaded on a table can differentiate between deletes issued * as part of a normal client operation versus those performed when rolling back a transaction. */ public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "tephra.tx.rollback"; + /** + * @deprecated This constant is replaced by {@link #TX_ROLLBACK_ATTRIBUTE_KEY} + */ + public static final String OLD_TX_ROLLBACK_ATTRIBUTE_KEY = "cask.tx.rollback"; /** * Column qualifier used for a special delete marker tombstone, which identifies an entire column family as deleted. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java index f9a3ff3..4633248 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java @@ -139,7 +139,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable byte[] family = change.getFamily(); byte[] qualifier = change.getQualifier(); Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + makeRollbackOperation(rollbackDelete); switch (conflictLevel) { case ROW: case NONE: @@ -612,4 +612,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); } + + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java index 136dedc..c74f98d 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java @@ -173,7 +173,7 @@ public class TransactionProcessor extends BaseRegionObserver { // Deletes that are part of a transaction rollback do not need special handling. // They will never be rolled back, so are performed as normal HBase deletes. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + if (isRollbackOperation(delete)) { return; } @@ -301,12 +301,22 @@ public class TransactionProcessor extends BaseRegionObserver { private Transaction getFromOperation(OperationWithAttributes op) throws IOException { byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); + if (encoded == null) { + // to support old clients + encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY); + } if (encoded != null) { return txCodec.decode(encoded); } return null; } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { + return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || + // to support old clients + op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null; + } + /** * Derived classes can override this method to customize the filter used to return data visible for the current * transaction. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java index 6760ed7..1edd4d7 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java @@ -37,6 +37,7 @@ package org.apache.tephra.hbase96; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; + import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1552,4 +1553,65 @@ public class TransactionAwareHTableTest { assertTrue(result.isEmpty()); transactionContext.finish(); } + + /** + * Tests that transaction co-processor works with older clients + * + * @throws Exception + */ + @Test + public void testOlderClientOperations() throws Exception { + // Use old HTable to test + TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable); + transactionContext.addTransactionAware(oldTxAware); + + transactionContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + oldTxAware.put(put); + transactionContext.finish(); + + transactionContext.start(); + long txId = transactionContext.getCurrentTransaction().getTransactionId(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2); + oldTxAware.put(put); + // Invalidate the second Put + TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); + txClient.invalidate(txId); + + transactionContext.start(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3); + oldTxAware.put(put); + // Abort the third Put + transactionContext.abort(); + + // Get should now return the first value + transactionContext.start(); + Result result = oldTxAware.get(new Get(TestBytes.row)); + transactionContext.finish(); + + byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); + assertArrayEquals(TestBytes.value, value); + } + + /** + * Represents older transaction clients + */ + private static class OldTransactionAwareHTable extends TransactionAwareHTable { + public OldTransactionAwareHTable(HTableInterface hTable) { + super(hTable); + } + + @Override + public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { + op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); + } + + @Override + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java index 54babd1..9c04d8f 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java @@ -143,7 +143,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable byte[] family = change.getFamily(); byte[] qualifier = change.getQualifier(); Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + makeRollbackOperation(rollbackDelete); switch (conflictLevel) { case ROW: case NONE: @@ -639,4 +639,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); } + + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java index e8e045a..cc1915d 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java @@ -173,7 +173,7 @@ public class TransactionProcessor extends BaseRegionObserver { // Deletes that are part of a transaction rollback do not need special handling. // They will never be rolled back, so are performed as normal HBase deletes. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + if (isRollbackOperation(delete)) { return; } @@ -301,12 +301,22 @@ public class TransactionProcessor extends BaseRegionObserver { private Transaction getFromOperation(OperationWithAttributes op) throws IOException { byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); + if (encoded == null) { + // to support old clients + encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY); + } if (encoded != null) { return txCodec.decode(encoded); } return null; } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { + return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || + // to support old clients + op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null; + } + /** * Derived classes can override this method to customize the filter used to return data visible for the current * transaction. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java index 908f9c8..c4062df 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1549,4 +1550,65 @@ public class TransactionAwareHTableTest { assertTrue(result.isEmpty()); transactionContext.finish(); } + + /** + * Tests that transaction co-processor works with older clients + * + * @throws Exception + */ + @Test + public void testOlderClientOperations() throws Exception { + // Use old HTable to test + TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable); + transactionContext.addTransactionAware(oldTxAware); + + transactionContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + oldTxAware.put(put); + transactionContext.finish(); + + transactionContext.start(); + long txId = transactionContext.getCurrentTransaction().getTransactionId(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2); + oldTxAware.put(put); + // Invalidate the second Put + TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); + txClient.invalidate(txId); + + transactionContext.start(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3); + oldTxAware.put(put); + // Abort the third Put + transactionContext.abort(); + + // Get should now return the first value + transactionContext.start(); + Result result = oldTxAware.get(new Get(TestBytes.row)); + transactionContext.finish(); + + byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); + assertArrayEquals(TestBytes.value, value); + } + + /** + * Represents older transaction clients + */ + private static class OldTransactionAwareHTable extends TransactionAwareHTable { + public OldTransactionAwareHTable(HTableInterface hTable) { + super(hTable); + } + + @Override + public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { + op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); + } + + @Override + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java index ea9fdad..62cdafd 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java @@ -143,7 +143,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable byte[] family = change.getFamily(); byte[] qualifier = change.getQualifier(); Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + makeRollbackOperation(rollbackDelete); switch (conflictLevel) { case ROW: case NONE: @@ -671,4 +671,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); } + + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java index 45ac114..f219373 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java @@ -173,7 +173,7 @@ public class TransactionProcessor extends BaseRegionObserver { // Deletes that are part of a transaction rollback do not need special handling. // They will never be rolled back, so are performed as normal HBase deletes. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + if (isRollbackOperation(delete)) { return; } @@ -301,12 +301,22 @@ public class TransactionProcessor extends BaseRegionObserver { private Transaction getFromOperation(OperationWithAttributes op) throws IOException { byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); + if (encoded == null) { + // to support old clients + encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY); + } if (encoded != null) { return txCodec.decode(encoded); } return null; } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { + return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || + // to support old clients + op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null; + } + /** * Derived classes can override this method to customize the filter used to return data visible for the current * transaction. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java index d31b972..1360252 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1549,4 +1550,65 @@ public class TransactionAwareHTableTest { assertTrue(result.isEmpty()); transactionContext.finish(); } + + /** + * Tests that transaction co-processor works with older clients + * + * @throws Exception + */ + @Test + public void testOlderClientOperations() throws Exception { + // Use old HTable to test + TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable); + transactionContext.addTransactionAware(oldTxAware); + + transactionContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + oldTxAware.put(put); + transactionContext.finish(); + + transactionContext.start(); + long txId = transactionContext.getCurrentTransaction().getTransactionId(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2); + oldTxAware.put(put); + // Invalidate the second Put + TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); + txClient.invalidate(txId); + + transactionContext.start(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3); + oldTxAware.put(put); + // Abort the third Put + transactionContext.abort(); + + // Get should now return the first value + transactionContext.start(); + Result result = oldTxAware.get(new Get(TestBytes.row)); + transactionContext.finish(); + + byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); + assertArrayEquals(TestBytes.value, value); + } + + /** + * Represents older transaction clients + */ + private static class OldTransactionAwareHTable extends TransactionAwareHTable { + public OldTransactionAwareHTable(HTableInterface hTable) { + super(hTable); + } + + @Override + public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { + op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); + } + + @Override + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java index 3d0fb20..ed8cc25 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java @@ -143,7 +143,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable byte[] family = change.getFamily(); byte[] qualifier = change.getQualifier(); Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + makeRollbackOperation(rollbackDelete); switch (conflictLevel) { case ROW: case NONE: @@ -671,4 +671,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); } + + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java index 0d6ef17..1cb6564 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java @@ -173,7 +173,7 @@ public class TransactionProcessor extends BaseRegionObserver { // Deletes that are part of a transaction rollback do not need special handling. // They will never be rolled back, so are performed as normal HBase deletes. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + if (isRollbackOperation(delete)) { return; } @@ -301,12 +301,22 @@ public class TransactionProcessor extends BaseRegionObserver { private Transaction getFromOperation(OperationWithAttributes op) throws IOException { byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); + if (encoded == null) { + // to support old clients + encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY); + } if (encoded != null) { return txCodec.decode(encoded); } return null; } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { + return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || + // to support old clients + op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null; + } + /** * Derived classes can override this method to customize the filter used to return data visible for the current * transaction. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java index 6ff4bb1..0d54e89 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1549,4 +1550,65 @@ public class TransactionAwareHTableTest { assertTrue(result.isEmpty()); transactionContext.finish(); } + + /** + * Tests that transaction co-processor works with older clients + * + * @throws Exception + */ + @Test + public void testOlderClientOperations() throws Exception { + // Use old HTable to test + TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable); + transactionContext.addTransactionAware(oldTxAware); + + transactionContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + oldTxAware.put(put); + transactionContext.finish(); + + transactionContext.start(); + long txId = transactionContext.getCurrentTransaction().getTransactionId(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2); + oldTxAware.put(put); + // Invalidate the second Put + TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); + txClient.invalidate(txId); + + transactionContext.start(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3); + oldTxAware.put(put); + // Abort the third Put + transactionContext.abort(); + + // Get should now return the first value + transactionContext.start(); + Result result = oldTxAware.get(new Get(TestBytes.row)); + transactionContext.finish(); + + byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); + assertArrayEquals(TestBytes.value, value); + } + + /** + * Represents older transaction clients + */ + private static class OldTransactionAwareHTable extends TransactionAwareHTable { + public OldTransactionAwareHTable(HTableInterface hTable) { + super(hTable); + } + + @Override + public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { + op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); + } + + @Override + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java index f28930e..4740f6b 100644 --- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java +++ b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java @@ -143,7 +143,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable byte[] family = change.getFamily(); byte[] qualifier = change.getQualifier(); Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + makeRollbackOperation(rollbackDelete); switch (conflictLevel) { case ROW: case NONE: @@ -671,4 +671,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); } + + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java index 8051279..27a7ef2 100644 --- a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java @@ -173,7 +173,7 @@ public class TransactionProcessor extends BaseRegionObserver { // Deletes that are part of a transaction rollback do not need special handling. // They will never be rolled back, so are performed as normal HBase deletes. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + if (isRollbackOperation(delete)) { return; } @@ -300,12 +300,22 @@ public class TransactionProcessor extends BaseRegionObserver { private Transaction getFromOperation(OperationWithAttributes op) throws IOException { byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); + if (encoded == null) { + // to support old clients + encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY); + } if (encoded != null) { return txCodec.decode(encoded); } return null; } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { + return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || + // to support old clients + op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null; + } + /** * Derived classes can override this method to customize the filter used to return data visible for the current * transaction. http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java index 8578230..9968fb3 100644 --- a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.OperationWithAttributes; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -1543,4 +1544,65 @@ public class TransactionAwareHTableTest { assertTrue(result.isEmpty()); transactionContext.finish(); } + + /** + * Tests that transaction co-processor works with older clients + * + * @throws Exception + */ + @Test + public void testOlderClientOperations() throws Exception { + // Use old HTable to test + TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable); + transactionContext.addTransactionAware(oldTxAware); + + transactionContext.start(); + Put put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + oldTxAware.put(put); + transactionContext.finish(); + + transactionContext.start(); + long txId = transactionContext.getCurrentTransaction().getTransactionId(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2); + oldTxAware.put(put); + // Invalidate the second Put + TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager); + txClient.invalidate(txId); + + transactionContext.start(); + put = new Put(TestBytes.row); + put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3); + oldTxAware.put(put); + // Abort the third Put + transactionContext.abort(); + + // Get should now return the first value + transactionContext.start(); + Result result = oldTxAware.get(new Get(TestBytes.row)); + transactionContext.finish(); + + byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier); + assertArrayEquals(TestBytes.value, value); + } + + /** + * Represents older transaction clients + */ + private static class OldTransactionAwareHTable extends TransactionAwareHTable { + public OldTransactionAwareHTable(HTableInterface hTable) { + super(hTable); + } + + @Override + public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { + op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); + } + + @Override + protected void makeRollbackOperation(Delete delete) { + delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } + } }
