Support old clients on operation attribute name change

Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/e0a6e0a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/e0a6e0a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/e0a6e0a5

Branch: refs/heads/master
Commit: e0a6e0a569789afc2872a3a247976b7d08aa463c
Parents: 6cc3adb
Author: poorna <[email protected]>
Authored: Tue May 10 21:46:43 2016 -0700
Committer: poorna <[email protected]>
Committed: Tue May 10 21:46:43 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/tephra/TxConstants.java     |  8 +++
 .../tephra/hbase96/TransactionAwareHTable.java  |  6 +-
 .../coprocessor/TransactionProcessor.java       | 12 +++-
 .../hbase96/TransactionAwareHTableTest.java     | 62 ++++++++++++++++++++
 .../tephra/hbase98/TransactionAwareHTable.java  |  6 +-
 .../coprocessor/TransactionProcessor.java       | 12 +++-
 .../hbase98/TransactionAwareHTableTest.java     | 62 ++++++++++++++++++++
 .../hbase10cdh/TransactionAwareHTable.java      |  6 +-
 .../coprocessor/TransactionProcessor.java       | 12 +++-
 .../hbase10cdh/TransactionAwareHTableTest.java  | 62 ++++++++++++++++++++
 .../tephra/hbase10/TransactionAwareHTable.java  |  6 +-
 .../coprocessor/TransactionProcessor.java       | 12 +++-
 .../hbase10/TransactionAwareHTableTest.java     | 62 ++++++++++++++++++++
 .../tephra/hbase11/TransactionAwareHTable.java  |  6 +-
 .../coprocessor/TransactionProcessor.java       | 12 +++-
 .../hbase11/TransactionAwareHTableTest.java     | 62 ++++++++++++++++++++
 16 files changed, 398 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java 
b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index d962486..61ee3cc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -88,11 +88,19 @@ public class TxConstants {
    */
   public static final String TX_OPERATION_ATTRIBUTE_KEY = "tephra.tx";
   /**
+   * @deprecated This constant is replaced by {@link 
#TX_OPERATION_ATTRIBUTE_KEY}
+   */
+  public static final String OLD_TX_OPERATION_ATTRIBUTE_KEY = "cask.tx";
+  /**
    * Key used to flag a delete operation as part of a transaction rollback.  
This is used so that the
    * {@code TransactionProcessor} coprocessor loaded on a table can 
differentiate between deletes issued
    * as part of a normal client operation versus those performed when rolling 
back a transaction.
    */
   public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "tephra.tx.rollback";
+  /**
+   * @deprecated This constant is replaced by {@link 
#TX_ROLLBACK_ATTRIBUTE_KEY}
+   */
+  public static final String OLD_TX_ROLLBACK_ATTRIBUTE_KEY = 
"cask.tx.rollback";
 
   /**
    * Column qualifier used for a special delete marker tombstone, which 
identifies an entire column family as deleted.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
 
b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
index f9a3ff3..4633248 100644
--- 
a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
+++ 
b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java
@@ -139,7 +139,7 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
           byte[] family = change.getFamily();
           byte[] qualifier = change.getQualifier();
           Delete rollbackDelete = new Delete(row);
-          rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
new byte[0]);
+          makeRollbackOperation(rollbackDelete);
           switch (conflictLevel) {
             case ROW:
             case NONE:
@@ -612,4 +612,8 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
   public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
     op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
   }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
index 136dedc..c74f98d 100644
--- 
a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
     // Deletes that are part of a transaction rollback do not need special 
handling.
     // They will never be rolled back, so are performed as normal HBase 
deletes.
-    if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+    if (isRollbackOperation(delete)) {
       return;
     }
 
@@ -301,12 +301,22 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
   private Transaction getFromOperation(OperationWithAttributes op) throws 
IOException {
     byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
     if (encoded != null) {
       return txCodec.decode(encoded);
     }
     return null;
   }
 
+  private boolean isRollbackOperation(OperationWithAttributes op) throws 
IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
   /**
    * Derived classes can override this method to customize the filter used to 
return data visible for the current
    * transaction.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
 
b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
index 6760ed7..1edd4d7 100644
--- 
a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
+++ 
b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase96/TransactionAwareHTableTest.java
@@ -37,6 +37,7 @@ package org.apache.tephra.hbase96;
  import org.apache.hadoop.hbase.client.HBaseAdmin;
  import org.apache.hadoop.hbase.client.HTable;
  import org.apache.hadoop.hbase.client.HTableInterface;
+ import org.apache.hadoop.hbase.client.OperationWithAttributes;
  import org.apache.hadoop.hbase.client.Put;
  import org.apache.hadoop.hbase.client.Result;
  import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1552,4 +1553,65 @@ public class TransactionAwareHTableTest {
     assertTrue(result.isEmpty());
     transactionContext.finish();
   }
+
+  /**
+   * Tests that transaction co-processor works with older clients
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOlderClientOperations() throws Exception {
+    // Use old HTable to test
+    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+    transactionContext.addTransactionAware(oldTxAware);
+
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    oldTxAware.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    long txId = transactionContext.getCurrentTransaction().getTransactionId();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+    oldTxAware.put(put);
+    // Invalidate the second Put
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(txId);
+
+    transactionContext.start();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+    oldTxAware.put(put);
+    // Abort the third Put
+    transactionContext.abort();
+
+    // Get should now return the first value
+    transactionContext.start();
+    Result result = oldTxAware.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Represents older transaction clients
+   */
+  private static class OldTransactionAwareHTable extends 
TransactionAwareHTable {
+    public OldTransactionAwareHTable(HTableInterface hTable) {
+      super(hTable);
+    }
+
+    @Override
+    public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
+      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
+    }
+
+    @Override
+    protected void makeRollbackOperation(Delete delete) {
+      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
 
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
index 54babd1..9c04d8f 100644
--- 
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
+++ 
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/TransactionAwareHTable.java
@@ -143,7 +143,7 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
           byte[] family = change.getFamily();
           byte[] qualifier = change.getQualifier();
           Delete rollbackDelete = new Delete(row);
-          rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
new byte[0]);
+          makeRollbackOperation(rollbackDelete);
           switch (conflictLevel) {
             case ROW:
             case NONE:
@@ -639,4 +639,8 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
   public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
     op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
   }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
index e8e045a..cc1915d 100644
--- 
a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase98/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
     // Deletes that are part of a transaction rollback do not need special 
handling.
     // They will never be rolled back, so are performed as normal HBase 
deletes.
-    if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+    if (isRollbackOperation(delete)) {
       return;
     }
 
@@ -301,12 +301,22 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
   private Transaction getFromOperation(OperationWithAttributes op) throws 
IOException {
     byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
     if (encoded != null) {
       return txCodec.decode(encoded);
     }
     return null;
   }
 
+  private boolean isRollbackOperation(OperationWithAttributes op) throws 
IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
   /**
    * Derived classes can override this method to customize the filter used to 
return data visible for the current
    * transaction.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
 
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
index 908f9c8..c4062df 100644
--- 
a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
+++ 
b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase98/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1549,4 +1550,65 @@ public class TransactionAwareHTableTest {
     assertTrue(result.isEmpty());
     transactionContext.finish();
   }
+
+  /**
+   * Tests that transaction co-processor works with older clients
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOlderClientOperations() throws Exception {
+    // Use old HTable to test
+    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+    transactionContext.addTransactionAware(oldTxAware);
+
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    oldTxAware.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    long txId = transactionContext.getCurrentTransaction().getTransactionId();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+    oldTxAware.put(put);
+    // Invalidate the second Put
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(txId);
+
+    transactionContext.start();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+    oldTxAware.put(put);
+    // Abort the third Put
+    transactionContext.abort();
+
+    // Get should now return the first value
+    transactionContext.start();
+    Result result = oldTxAware.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Represents older transaction clients
+   */
+  private static class OldTransactionAwareHTable extends 
TransactionAwareHTable {
+    public OldTransactionAwareHTable(HTableInterface hTable) {
+      super(hTable);
+    }
+
+    @Override
+    public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
+      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
+    }
+
+    @Override
+    protected void makeRollbackOperation(Delete delete) {
+      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
 
b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
index ea9fdad..62cdafd 100644
--- 
a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
+++ 
b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/TransactionAwareHTable.java
@@ -143,7 +143,7 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
           byte[] family = change.getFamily();
           byte[] qualifier = change.getQualifier();
           Delete rollbackDelete = new Delete(row);
-          rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
new byte[0]);
+          makeRollbackOperation(rollbackDelete);
           switch (conflictLevel) {
             case ROW:
             case NONE:
@@ -671,4 +671,8 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
   public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
     op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
   }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
index 45ac114..f219373 100644
--- 
a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase10cdh/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
     // Deletes that are part of a transaction rollback do not need special 
handling.
     // They will never be rolled back, so are performed as normal HBase 
deletes.
-    if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+    if (isRollbackOperation(delete)) {
       return;
     }
 
@@ -301,12 +301,22 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
   private Transaction getFromOperation(OperationWithAttributes op) throws 
IOException {
     byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
     if (encoded != null) {
       return txCodec.decode(encoded);
     }
     return null;
   }
 
+  private boolean isRollbackOperation(OperationWithAttributes op) throws 
IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
   /**
    * Derived classes can override this method to customize the filter used to 
return data visible for the current
    * transaction.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
 
b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
index d31b972..1360252 100644
--- 
a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
+++ 
b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase10cdh/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1549,4 +1550,65 @@ public class TransactionAwareHTableTest {
     assertTrue(result.isEmpty());
     transactionContext.finish();
   }
+
+  /**
+   * Tests that transaction co-processor works with older clients
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOlderClientOperations() throws Exception {
+    // Use old HTable to test
+    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+    transactionContext.addTransactionAware(oldTxAware);
+
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    oldTxAware.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    long txId = transactionContext.getCurrentTransaction().getTransactionId();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+    oldTxAware.put(put);
+    // Invalidate the second Put
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(txId);
+
+    transactionContext.start();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+    oldTxAware.put(put);
+    // Abort the third Put
+    transactionContext.abort();
+
+    // Get should now return the first value
+    transactionContext.start();
+    Result result = oldTxAware.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Represents older transaction clients
+   */
+  private static class OldTransactionAwareHTable extends 
TransactionAwareHTable {
+    public OldTransactionAwareHTable(HTableInterface hTable) {
+      super(hTable);
+    }
+
+    @Override
+    public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
+      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
+    }
+
+    @Override
+    protected void makeRollbackOperation(Delete delete) {
+      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
 
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
index 3d0fb20..ed8cc25 100644
--- 
a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
+++ 
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/TransactionAwareHTable.java
@@ -143,7 +143,7 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
           byte[] family = change.getFamily();
           byte[] qualifier = change.getQualifier();
           Delete rollbackDelete = new Delete(row);
-          rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
new byte[0]);
+          makeRollbackOperation(rollbackDelete);
           switch (conflictLevel) {
             case ROW:
             case NONE:
@@ -671,4 +671,8 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
   public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
     op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
   }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
index 0d6ef17..1cb6564 100644
--- 
a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase10/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
     // Deletes that are part of a transaction rollback do not need special 
handling.
     // They will never be rolled back, so are performed as normal HBase 
deletes.
-    if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+    if (isRollbackOperation(delete)) {
       return;
     }
 
@@ -301,12 +301,22 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
   private Transaction getFromOperation(OperationWithAttributes op) throws 
IOException {
     byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
     if (encoded != null) {
       return txCodec.decode(encoded);
     }
     return null;
   }
 
+  private boolean isRollbackOperation(OperationWithAttributes op) throws 
IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
   /**
    * Derived classes can override this method to customize the filter used to 
return data visible for the current
    * transaction.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
 
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
index 6ff4bb1..0d54e89 100644
--- 
a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
+++ 
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase10/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1549,4 +1550,65 @@ public class TransactionAwareHTableTest {
     assertTrue(result.isEmpty());
     transactionContext.finish();
   }
+
+  /**
+   * Tests that transaction co-processor works with older clients
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOlderClientOperations() throws Exception {
+    // Use old HTable to test
+    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+    transactionContext.addTransactionAware(oldTxAware);
+
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    oldTxAware.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    long txId = transactionContext.getCurrentTransaction().getTransactionId();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+    oldTxAware.put(put);
+    // Invalidate the second Put
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(txId);
+
+    transactionContext.start();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+    oldTxAware.put(put);
+    // Abort the third Put
+    transactionContext.abort();
+
+    // Get should now return the first value
+    transactionContext.start();
+    Result result = oldTxAware.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Represents older transaction clients
+   */
+  private static class OldTransactionAwareHTable extends 
TransactionAwareHTable {
+    public OldTransactionAwareHTable(HTableInterface hTable) {
+      super(hTable);
+    }
+
+    @Override
+    public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
+      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
+    }
+
+    @Override
+    protected void makeRollbackOperation(Delete delete) {
+      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
 
b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
index f28930e..4740f6b 100644
--- 
a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
+++ 
b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/TransactionAwareHTable.java
@@ -143,7 +143,7 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
           byte[] family = change.getFamily();
           byte[] qualifier = change.getQualifier();
           Delete rollbackDelete = new Delete(row);
-          rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
new byte[0]);
+          makeRollbackOperation(rollbackDelete);
           switch (conflictLevel) {
             case ROW:
             case NONE:
@@ -671,4 +671,8 @@ public class TransactionAwareHTable extends 
AbstractTransactionAwareTable
   public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
     op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
   }
+
+  protected void makeRollbackOperation(Delete delete) {
+    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
index 8051279..27a7ef2 100644
--- 
a/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-1.1/src/main/java/org/apache/tephra/hbase11/coprocessor/TransactionProcessor.java
@@ -173,7 +173,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
     // Deletes that are part of a transaction rollback do not need special 
handling.
     // They will never be rolled back, so are performed as normal HBase 
deletes.
-    if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
+    if (isRollbackOperation(delete)) {
       return;
     }
 
@@ -300,12 +300,22 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
   private Transaction getFromOperation(OperationWithAttributes op) throws 
IOException {
     byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY);
+    if (encoded == null) {
+      // to support old clients
+      encoded = op.getAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY);
+    }
     if (encoded != null) {
       return txCodec.decode(encoded);
     }
     return null;
   }
 
+  private boolean isRollbackOperation(OperationWithAttributes op) throws 
IOException {
+    return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null ||
+      // to support old clients
+      op.getAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY) != null;
+  }
+
   /**
    * Derived classes can override this method to customize the filter used to 
return data visible for the current
    * transaction.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/e0a6e0a5/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
 
b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
index 8578230..9968fb3 100644
--- 
a/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
+++ 
b/tephra-hbase-compat-1.1/src/test/java/org/apache/tephra/hbase11/TransactionAwareHTableTest.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -1543,4 +1544,65 @@ public class TransactionAwareHTableTest {
     assertTrue(result.isEmpty());
     transactionContext.finish();
   }
+
+  /**
+   * Tests that transaction co-processor works with older clients
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOlderClientOperations() throws Exception {
+    // Use old HTable to test
+    TransactionAwareHTable oldTxAware = new OldTransactionAwareHTable(hTable);
+    transactionContext.addTransactionAware(oldTxAware);
+
+    transactionContext.start();
+    Put put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value);
+    oldTxAware.put(put);
+    transactionContext.finish();
+
+    transactionContext.start();
+    long txId = transactionContext.getCurrentTransaction().getTransactionId();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value2);
+    oldTxAware.put(put);
+    // Invalidate the second Put
+    TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
+    txClient.invalidate(txId);
+
+    transactionContext.start();
+    put = new Put(TestBytes.row);
+    put.add(TestBytes.family, TestBytes.qualifier, TestBytes.value3);
+    oldTxAware.put(put);
+    // Abort the third Put
+    transactionContext.abort();
+
+    // Get should now return the first value
+    transactionContext.start();
+    Result result = oldTxAware.get(new Get(TestBytes.row));
+    transactionContext.finish();
+
+    byte[] value = result.getValue(TestBytes.family, TestBytes.qualifier);
+    assertArrayEquals(TestBytes.value, value);
+  }
+
+  /**
+   * Represents older transaction clients
+   */
+  private static class OldTransactionAwareHTable extends 
TransactionAwareHTable {
+    public OldTransactionAwareHTable(HTableInterface hTable) {
+      super(hTable);
+    }
+
+    @Override
+    public void addToOperation(OperationWithAttributes op, Transaction tx) 
throws IOException {
+      op.setAttribute(TxConstants.OLD_TX_OPERATION_ATTRIBUTE_KEY, 
txCodec.encode(tx));
+    }
+
+    @Override
+    protected void makeRollbackOperation(Delete delete) {
+      delete.setAttribute(TxConstants.OLD_TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
+    }
+  }
 }


Reply via email to