Connect TAL to Phoenix

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b9929872
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b9929872
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b9929872

Branch: refs/heads/4.x-HBase-1.2
Commit: b9929872fbd93f6b77c0d478aa9f1c5134fbd702
Parents: aec1cd5
Author: Ohad Shacham <[email protected]>
Authored: Sun Jun 25 13:49:35 2017 +0300
Committer: Thomas <[email protected]>
Committed: Mon Jun 26 12:09:11 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java |   4 +-
 .../phoenix/tx/FlappingTransactionIT.java       |  41 +-
 .../phoenix/tx/ParameterizedTransactionIT.java  |   7 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  21 +-
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |  63 +--
 .../phoenix/cache/IndexMetaDataCache.java       |   7 +-
 .../PhoenixTransactionalProcessor.java          |   4 +-
 .../UngroupedAggregateRegionObserver.java       |   6 +-
 .../apache/phoenix/execute/MutationState.java   | 374 +++++---------
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../index/IndexMetaDataCacheFactory.java        |  15 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |  14 +-
 .../index/PhoenixTransactionalIndexer.java      |  35 +-
 .../NonAggregateRegionScannerFactory.java       |   5 +-
 .../phoenix/iterate/RegionScannerFactory.java   |   4 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   4 +-
 .../phoenix/query/ConnectionQueryServices.java  |   2 -
 .../query/ConnectionQueryServicesImpl.java      |  62 +--
 .../query/ConnectionlessQueryServicesImpl.java  |  15 +-
 .../query/DelegateConnectionQueryServices.java  |   6 -
 .../apache/phoenix/schema/MetaDataClient.java   |   4 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |   6 +-
 .../transaction/OmidTransactionContext.java     | 174 +++++++
 .../transaction/OmidTransactionTable.java       | 339 ++++++++++++
 .../transaction/PhoenixTransactionContext.java  | 191 +++++++
 .../transaction/PhoenixTransactionalTable.java  | 149 ++++++
 .../transaction/TephraTransactionContext.java   | 516 +++++++++++++++++++
 .../transaction/TephraTransactionTable.java     | 330 ++++++++++++
 .../phoenix/transaction/TransactionFactory.java | 143 +++++
 .../java/org/apache/phoenix/util/IndexUtil.java |   4 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |   4 +-
 .../apache/phoenix/util/TransactionUtil.java    |  31 +-
 .../java/org/apache/phoenix/query/BaseTest.java |  68 +--
 33 files changed, 2131 insertions(+), 521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index cd0c371..0747541 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -271,11 +271,11 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize, int 
maxSizeBytes) {
-                return new MutationState(maxSize, maxSizeBytes, this, 
mutations, null, null);
+                return new MutationState(maxSize, maxSizeBytes, this, 
mutations, false, null);
             };
         };
     }
-    
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         @Override
         public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, 
Put put, WALEdit edit,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 5a990cf..06eac6c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -42,12 +42,11 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
 import org.junit.Test;
 
 /**
@@ -213,8 +212,6 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         String fullTableName = generateUniqueName();
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
-        TransactionSystemClient txServiceClient = 
pconn.getQueryServices().getTransactionSystemClient();
-
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY 
KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
         HTableInterface htable = 
pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
@@ -227,16 +224,18 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
             assertEquals(1,rs.getInt(1));
         }
 
-        // Use HBase level Tephra APIs to start a new transaction
-        TransactionAwareHTable txAware = new TransactionAwareHTable(htable, 
TxConstants.ConflictDetection.ROW);
-        TransactionContext txContext = new TransactionContext(txServiceClient, 
txAware);
-        txContext.start();
-        
+        PhoenixTransactionContext txContext =
+              
TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+        PhoenixTransactionalTable txTable =
+              
TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, 
htable);
+
+        txContext.begin();
+
         // Use HBase APIs to add a new row
         Put put = new Put(Bytes.toBytes("z"));
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("b"));
-        txAware.put(put);
+        txTable.put(put);
         
         // Use Phoenix APIs to add new row (sharing the transaction context)
         pconn.setTransactionContext(txContext);
@@ -258,8 +257,8 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(3,rs.getInt(1));
         
-        // Use Tephra APIs directly to finish (i.e. commit) the transaction
-        txContext.finish();
+        // Use TM APIs directly to finish (i.e. commit) the transaction
+        txContext.commit();
         
         // Confirm that attempt to commit row with conflict fails
         try {
@@ -279,14 +278,18 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         }
         
         // Repeat the same as above, but this time abort the transaction
-        txContext = new TransactionContext(txServiceClient, txAware);
-        txContext.start();
+        txContext =
+              
TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+        txTable =
+              
TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, 
htable);
+
+        txContext.begin();
         
         // Use HBase APIs to add a new row
         put = new Put(Bytes.toBytes("j"));
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
         put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("V1"), Bytes.toBytes("e"));
-        txAware.put(put);
+        txTable.put(put);
         
         // Use Phoenix APIs to add new row (sharing the transaction context)
         pconn.setTransactionContext(txContext);
@@ -302,7 +305,7 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(4,rs.getInt(1));
 
-        // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+        // Use TM APIs directly to abort (i.e. rollback) the transaction
         txContext.abort();
         
         rs = conn.createStatement().executeQuery("select count(*) from " + 
fullTableName);
@@ -325,4 +328,4 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         assertTrue(result.isEmpty());
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index a5c1cf4..fecfd9a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -53,10 +53,10 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TxConstants;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -391,7 +391,10 @@ public class ParameterizedTransactionIT extends 
ParallelStatsDisabledIT {
         admin.createTable(desc);
         ddl = "CREATE TABLE " + t2 + " (k varchar primary key) 
transactional=true";
         conn.createStatement().execute(ddl);
-        assertEquals(Boolean.TRUE.toString(), 
admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA));
+
+        HTableDescriptor htableDescriptor = 
admin.getTableDescriptor(TableName.valueOf(t2));
+        String str = 
htableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA);
+        assertEquals(Boolean.TRUE.toString(), str);
         
         // Should be ok, as HBase metadata should match existing metadata.
         ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index f37d09b..c76e19c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -43,10 +43,10 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TxConstants;
 import org.junit.Test;
 
 public class TransactionIT  extends ParallelStatsDisabledIT {
@@ -147,21 +147,24 @@ public class TransactionIT  extends 
ParallelStatsDisabledIT {
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, 
colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, 
Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = 
colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
 
         desc = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1"));
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, 
colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, 
Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = 
colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
         
         desc = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2"));
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, 
colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, 
Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = 
colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
         
         conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k 
INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
@@ -170,14 +173,15 @@ public class TransactionIT  extends 
ParallelStatsDisabledIT {
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(10, colDesc.getMaxVersions());
             assertEquals(HColumnDescriptor.DEFAULT_TTL, 
colDesc.getTimeToLive());
-            assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL));
+            assertEquals(null, 
colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL));
         }
         conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 
SET TTL=1000");
         desc = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes(
 nonTxTableName + "2"));
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(10, colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, 
Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = 
colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
 
         conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "3(k 
INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
@@ -207,7 +211,8 @@ public class TransactionIT  extends ParallelStatsDisabledIT 
{
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             
assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, 
colDesc.getMaxVersions());
             assertEquals(HColumnDescriptor.DEFAULT_TTL, 
colDesc.getTimeToLive());
-            assertEquals(1000, 
Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = 
colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
     }
     
@@ -291,4 +296,4 @@ public class TransactionIT  extends ParallelStatsDisabledIT 
{
             conn.close();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index cb3b4b3..989a97e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -37,9 +37,9 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.tephra.Transaction.VisibilityLevel;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -53,21 +53,21 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
 
        public TxCheckpointIT(boolean localIndex, boolean mutable, boolean 
columnEncoded) {
            StringBuilder optionBuilder = new StringBuilder();
-               this.localIndex = localIndex;
-               if (!columnEncoded) {
-            if (optionBuilder.length()!=0)
-                optionBuilder.append(",");
-            optionBuilder.append("COLUMN_ENCODED_BYTES=0");
-        }
-        if (!mutable) {
-            if (optionBuilder.length()!=0)
-                optionBuilder.append(",");
-            optionBuilder.append("IMMUTABLE_ROWS=true");
-            if (!columnEncoded) {
-                
optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
-            }
-        }
-        this.tableDDLOptions = optionBuilder.toString();
+           this.localIndex = localIndex;
+           if (!columnEncoded) {
+               if (optionBuilder.length()!=0)
+                   optionBuilder.append(",");
+               optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+           }
+           if (!mutable) {
+               if (optionBuilder.length()!=0)
+                   optionBuilder.append(",");
+               optionBuilder.append("IMMUTABLE_ROWS=true");
+               if (!columnEncoded) {
+                   
optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+               }
+           }
+           this.tableDDLOptions = optionBuilder.toString();
        }
        
     private static Connection getConnection() throws SQLException {
@@ -83,8 +83,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
        
@Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") 
// name is used by failsafe as file name in reports
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false, false, false }, { false, false, true }, { false, 
true, false }, { false, true, true },
-                 { true, false, false }, { true, false, true }, { true, true, 
false }, { true, true, true }
+                { false, false, false }, { false, false, true }, { false, 
true, false }, { false, true, true },
+                { true, false, false }, { true, false, true }, { true, true, 
false }, { true, true, true }
            });
     }
     
@@ -101,7 +101,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
         Connection conn = getConnection(props);
         conn.setAutoCommit(true);
         conn.createStatement().execute("CREATE SEQUENCE "+seqName);
-        conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk 
INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions);
+        conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk 
INTEGER PRIMARY KEY, val INTEGER)"+tableDDLOptions);
         conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : 
"")+"INDEX " + indexName + " ON " + fullTableName + "(val)");
 
         conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES (NEXT VALUE FOR " + seqName + ",1)");
@@ -132,11 +132,12 @@ public class TxCheckpointIT extends 
ParallelStatsDisabledIT {
     }
     
     private void testRollbackOfUncommittedDelete(String indexDDL, String 
fullTableName) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = getConnection();
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY 
KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY 
KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions);
             stmt.execute(indexDDL);
             
             stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 
'y1', 'a1')");
@@ -220,11 +221,13 @@ public class TxCheckpointIT extends 
ParallelStatsDisabledIT {
         String tableName = "TBL_" + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(tableName, tableName);
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
                try (Connection conn = getConnection()) {
                        conn.setAutoCommit(false);
                        Statement stmt = conn.createStatement();
 
-                       stmt.execute("CREATE TABLE " + fullTableName + "(ID 
BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+                       stmt.execute("CREATE TABLE " + fullTableName + "(ID 
BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"
+                                       + tableDDLOptions);
                        stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
                                        + "INDEX " + indexName + " ON " + 
fullTableName + " (v1) INCLUDE(v2)");
 
@@ -266,7 +269,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
                long wp = state.getWritePointer();
                conn.createStatement().execute(
                                "upsert into " + fullTableName + " select 
max(id)+1, 'a4', 'b4' from " + fullTableName + "");
-               assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+               assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
                                state.getVisibilityLevel());
                assertEquals(wp, state.getWritePointer()); // Make sure write 
ptr
                                                                                
                        // didn't move
@@ -278,7 +281,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
 
                conn.createStatement().execute(
                                "upsert into " + fullTableName + " select 
max(id)+1, 'a5', 'b5' from " + fullTableName + "");
-               assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+               assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
                                state.getVisibilityLevel());
                assertNotEquals(wp, state.getWritePointer()); // Make sure 
write ptr
                                                                                
                                // moves
@@ -291,7 +294,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
                
                conn.createStatement().execute(
                                "upsert into " + fullTableName + " select 
max(id)+1, 'a6', 'b6' from " + fullTableName + "");
-               assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+               assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
                                state.getVisibilityLevel());
                assertNotEquals(wp, state.getWritePointer()); // Make sure 
write ptr
                                                                                
                                // moves
@@ -313,8 +316,10 @@ public class TxCheckpointIT extends 
ParallelStatsDisabledIT {
                try (Connection conn = getConnection()) {
                        conn.setAutoCommit(false);
                        Statement stmt = conn.createStatement();
-                       stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 
BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions);
-                       stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 
BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions);
+                       stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 
BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)"
+                                       + tableDDLOptions);
+                       stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 
BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)"
+                                       + tableDDLOptions);
                        stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
                                        + "INDEX " + indexName + " ON " + 
fullTableName + "1 (FK1B)");
                        
@@ -328,7 +333,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
                state.startTransaction();
                long wp = state.getWritePointer();
                conn.createStatement().execute("delete from " + fullTableName + 
"1 where id1=fk1b AND fk1b=id1");
-               assertEquals(VisibilityLevel.SNAPSHOT, 
state.getVisibilityLevel());
+               assertEquals(PhoenixVisibilityLevel.SNAPSHOT, 
state.getVisibilityLevel());
                assertEquals(wp, state.getWritePointer()); // Make sure write 
ptr didn't move
        
                rs = conn.createStatement().executeQuery("select /*+ NO_INDEX 
*/ id1 from " + fullTableName + "1");
@@ -346,7 +351,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
                assertFalse(rs.next());
        
                conn.createStatement().execute("delete from " + fullTableName + 
"1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName 
+ "2 on (fk2=id1))");
-               assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, 
state.getVisibilityLevel());
+               assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, 
state.getVisibilityLevel());
                assertNotEquals(wp, state.getWritePointer()); // Make sure 
write ptr moved
        
                rs = conn.createStatement().executeQuery("select /*+ NO_INDEX 
*/ id1 from " + fullTableName + "1");
@@ -363,7 +368,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
             stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 
4)");
 
             conn.createStatement().execute("delete from " + fullTableName + "1 
where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + 
"2 on (fk2=id1))");
-            assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, 
state.getVisibilityLevel());
+            assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, 
state.getVisibilityLevel());
             assertNotEquals(wp, state.getWritePointer()); // Make sure write 
ptr moved
     
             rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ 
id1 from " + fullTableName + "1");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index d22993c..16207c8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,9 +23,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.tephra.Transaction;
-
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
 public interface IndexMetaDataCache extends Closeable {
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new 
IndexMetaDataCache() {
@@ -40,11 +39,11 @@ public interface IndexMetaDataCache extends Closeable {
         }
 
         @Override
-        public Transaction getTransaction() {
+        public PhoenixTransactionContext getTransactionContext() {
             return null;
         }
         
     };
     public List<IndexMaintainer> getIndexMaintainers();
-    public Transaction getTransaction();
+    public PhoenixTransactionContext getTransactionContext();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
index 8693681..37fa2ab 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -17,12 +17,12 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.phoenix.transaction.TransactionFactory;
 
 public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
 
     public PhoenixTransactionalProcessor() {
-        super(new TransactionProcessor());
+        
super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 75654c9..d9e864e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -108,6 +108,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ExpressionUtil;
@@ -118,7 +119,6 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TimeKeeper;
-import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -609,7 +609,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             }
                             mutations.add(delete);
                             // force tephra to ignore this deletes
-                            
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+                            
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
                         } else if (isUpsert) {
                             Arrays.fill(values, null);
                             int bucketNumOffset = 0;
@@ -676,7 +676,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     results.get(0).getRowLength());
                                 delete.deleteColumns(deleteCF,  deleteCQ, ts);
                                 // force tephra to ignore this deletes
-                                
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+                                
delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new 
byte[0]);
                                 mutations.add(delete);
                             }
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index cc37402..cfb19f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -85,6 +85,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -95,17 +99,6 @@ import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.Transaction.VisibilityLevel;
-import org.apache.tephra.TransactionAware;
-import org.apache.tephra.TransactionCodec;
-import org.apache.tephra.TransactionConflictException;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.hbase.TransactionAwareHTable;
-import org.apache.tephra.visibility.FenceWait;
-import org.apache.tephra.visibility.VisibilityFence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,10 +115,9 @@ import com.google.common.collect.Sets;
  */
 public class MutationState implements SQLCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
-    private static final TransactionCodec CODEC = new TransactionCodec();
     private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
     private static final int MAX_COMMIT_RETRIES = 3;
-    
+
     private final PhoenixConnection connection;
     private final long maxSize;
     private final long maxSizeBytes;
@@ -133,48 +125,47 @@ public class MutationState implements SQLCloseable {
     private final long batchSizeBytes;
     private long batchCount = 0L;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
-    private final List<TransactionAware> txAwares;
-    private final TransactionContext txContext;
     private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
-    
-    private Transaction tx;
+
     private long sizeOffset;
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
     private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations 
= Collections.emptyMap();
-    
+
+    final PhoenixTransactionContext phoenixTransactionContext;
+
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
 
     public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection) {
-        this(maxSize, maxSizeBytes, connection, null, null);
+        this(maxSize, maxSizeBytes, connection, false, null);
     }
-    
-    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, TransactionContext txContext) {
-        this(maxSize, maxSizeBytes, connection, null, txContext);
+
+    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, PhoenixTransactionContext txContext) {
+        this(maxSize, maxSizeBytes, connection, false, txContext);
     }
-    
+
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize, mutationState.maxSizeBytes, 
mutationState.connection, mutationState.getTransaction(), null);
+        this(mutationState.maxSize, mutationState.maxSizeBytes, 
mutationState.connection, true, mutationState.getPhoenixTransactionContext());
     }
-    
+
     public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, null, null, sizeOffset);
+        this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
     }
-    
-    private MutationState(long maxSize, long maxSizeBytes,PhoenixConnection 
connection, Transaction tx, TransactionContext txContext) {
-        this(maxSize, maxSizeBytes, connection, tx, txContext, 0);
+
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, boolean subTask, PhoenixTransactionContext txContext) {
+        this(maxSize, maxSizeBytes, connection, subTask, txContext, 0);
     }
-    
-    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, Transaction tx, TransactionContext txContext, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, 
txContext);
+
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection, boolean subTask, PhoenixTransactionContext txContext, long 
sizeOffset) {
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, 
txContext);
         this.sizeOffset = sizeOffset;
     }
-    
-    MutationState(long maxSize, long maxSizeBytes,
-            PhoenixConnection connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, 
Transaction tx, TransactionContext txContext) {
+
+    MutationState(long maxSize, long maxSizeBytes, PhoenixConnection 
connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
         this.connection = connection;
@@ -184,115 +175,65 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-        this.tx = tx;
-        if (tx == null) {
-            this.txAwares = Collections.emptyList();
+        if (subTask == false) {
             if (txContext == null) {
-                TransactionSystemClient txServiceClient = this.connection
-                        .getQueryServices().getTransactionSystemClient();
-                this.txContext = new TransactionContext(txServiceClient);
+                phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(connection);
             } else {
                 isExternalTxContext = true;
-                this.txContext = txContext;
+                phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txContext, 
connection, subTask);
             }
         } else {
             // this code path is only used while running child scans, we can't 
pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
-            this.txAwares = Lists.newArrayList();
-            this.txContext = null;
+            phoenixTransactionContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txContext, 
connection, subTask);
         }
     }
 
     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
-        this(maxSize, maxSizeBytes, connection, null, null, sizeOffset);
+        this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
-        this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
-    
+
     public long getMaxSize() {
         return maxSize;
     }
-    
+
     public long getMaxSizeBytes() {
         return maxSizeBytes;
     }
-    
+
+    public PhoenixTransactionContext getPhoenixTransactionContext() {
+        return phoenixTransactionContext;
+    }
+
     /**
      * Commit a write fence when creating an index so that we can detect
      * when a data table transaction is started before the create index
      * but completes after it. In this case, we need to rerun the data
      * table transaction after the index creation so that the index rows
-     * are generated. See {@link #addDMLFence(PTable)} and TEPHRA-157
+     * are generated. See TEPHRA-157
      * for more information.
      * @param dataTable the data table upon which an index is being added
      * @throws SQLException
      */
     public void commitDDLFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
-            byte[] key = dataTable.getName().getBytes();
-            boolean success = false;
             try {
-                FenceWait fenceWait = VisibilityFence.prepareWait(key, 
connection.getQueryServices().getTransactionSystemClient());
-                fenceWait.await(10000, TimeUnit.MILLISECONDS);
-                success = true;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
-            } catch (TimeoutException | TransactionFailureException e) {
-                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
-                .setSchemaName(dataTable.getSchemaName().getString())
-                .setTableName(dataTable.getTableName().getString())
-                .build().buildException();
+                phoenixTransactionContext.commitDDLFence(dataTable, logger);
             } finally {
                 // The client expects a transaction to be in progress on the 
txContext while the
                 // VisibilityFence.prepareWait() starts a new tx and 
finishes/aborts it. After it's
                 // finished, we start a new one here.
                 // TODO: seems like an autonomous tx capability in Tephra 
would be useful here.
-                try {
-                    txContext.start();
-                    if (logger.isInfoEnabled() && success) logger.info("Added 
write fence at ~" + getTransaction().getReadPointer());
-                } catch (TransactionFailureException e) {
-                    throw TransactionUtil.getTransactionFailureException(e);
-                }
-            }
-        }
-    }
-    
-    /**
-     * Add an entry to the change set representing the DML operation that is 
starting.
-     * These entries will not conflict with each other, but they will conflict 
with a
-     * DDL operation of creating an index. See {@link #addDMLFence(PTable)} 
and TEPHRA-157
-     * for more information.
-     * @param table the table which is doing DML
-     * @throws SQLException
-     */
-    private void addDMLFence(PTable table) throws SQLException {
-        if (table.getType() == PTableType.INDEX || !table.isTransactional()) {
-            return;
-        }
-        byte[] logicalKey = table.getName().getBytes();
-        TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
-        if (this.txContext == null) {
-            this.txAwares.add(logicalTxAware);
-        } else {
-            this.txContext.addTransactionAware(logicalTxAware);
-        }
-        byte[] physicalKey = table.getPhysicalName().getBytes();
-        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
-            TransactionAware physicalTxAware = 
VisibilityFence.create(physicalKey);
-            if (this.txContext == null) {
-                this.txAwares.add(physicalTxAware);
-            } else {
-                this.txContext.addTransactionAware(physicalTxAware);
+                phoenixTransactionContext.begin();
             }
         }
     }
     
     public boolean checkpointIfNeccessary(MutationPlan plan) throws 
SQLException {
-        Transaction currentTx = getTransaction();
-        if (getTransaction() == null || plan.getTargetRef() == null || 
plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
+        if (! phoenixTransactionContext.isTransactionRunning()  || 
plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
             return false;
         }
         Set<TableRef> sources = plan.getSourceRefs();
@@ -332,118 +273,77 @@ public class MutationState implements SQLCloseable {
                     break;
                 }
             }
+
+            phoenixTransactionContext.checkpoint(hasUncommittedData);
+
             if (hasUncommittedData) {
-                try {
-                    if (txContext == null) {
-                        currentTx = tx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
-                    }  else {
-                        txContext.checkpoint();
-                        currentTx = tx = txContext.getCurrentTransaction();
-                    }
-                    // Since we've checkpointed, we can clear out uncommitted 
set, since a statement run afterwards
-                    // should see all this data.
-                    uncommittedPhysicalNames.clear();
-                } catch (TransactionFailureException e) {
-                    throw new SQLException(e);
-                } 
+                uncommittedPhysicalNames.clear();
             }
-            // Since we're querying our own table while mutating it, we must 
exclude
-            // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
-            // or get into an infinite loop (for UPSERT SELECT).
-            currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
             return true;
         }
         return false;
     }
-    
-    private void addTransactionParticipant(TransactionAware txAware) throws 
SQLException {
-        if (txContext == null) {
-            txAwares.add(txAware);
-            assert(tx != null);
-            txAware.startTx(tx);
-        } else {
-            txContext.addTransactionAware(txAware);
-        }
-    }
-    
+
     // Though MutationState is not thread safe in general, this method should 
be because it may
     // be called by TableResultIterator in a multi-threaded manner. Since we 
do not want to expose
     // the Transaction outside of MutationState, this seems reasonable, as the 
member variables
     // would not change as these threads are running.
     public HTableInterface getHTable(PTable table) throws SQLException {
         HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
-        Transaction currentTx;
-        if (table.isTransactional() && (currentTx=getTransaction()) != null) {
-            TransactionAwareHTable txAware = 
TransactionUtil.getTransactionAwareHTable(htable, table.isImmutableRows());
+        if (table.isTransactional() && 
phoenixTransactionContext.isTransactionRunning()) {
+            PhoenixTransactionalTable phoenixTransactionTable = 
TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, 
table);
             // Using cloned mutationState as we may have started a new 
transaction already
             // if auto commit is true and we need to use the original one here.
-            txAware.startTx(currentTx);
-            htable = txAware;
+            htable = phoenixTransactionTable;
         }
         return htable;
     }
-    
+
     public PhoenixConnection getConnection() {
         return connection;
     }
-    
-    // Kept private as the Transaction may change when check pointed. Keeping 
it private ensures
-    // no one holds on to a stale copy.
-    private Transaction getTransaction() {
-        return tx != null ? tx : txContext != null ? 
txContext.getCurrentTransaction() : null;
-    }
-    
+
     public boolean isTransactionStarted() {
-        return getTransaction() != null;
+        return phoenixTransactionContext.isTransactionRunning();
     }
-    
+
     public long getInitialWritePointer() {
-        Transaction tx = getTransaction();
-        return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getTransactionId(); // First write pointer - won't change with checkpointing
+        return phoenixTransactionContext.getTransactionId(); // First write 
pointer - won't change with checkpointing
     }
-    
+
     // For testing
     public long getWritePointer() {
-        Transaction tx = getTransaction();
-        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
+        return phoenixTransactionContext.getWritePointer();
     }
-    
+
     // For testing
-    public VisibilityLevel getVisibilityLevel() {
-        Transaction tx = getTransaction();
-        return tx == null ? null : tx.getVisibilityLevel();
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        return phoenixTransactionContext.getVisibilityLevel();
     }
-    
+
     public boolean startTransaction() throws SQLException {
-        if (txContext == null) {
-            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
-        }
-        
         if (connection.getSCN() != null) {
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
                     .build().buildException();
         }
-        
-        try {
-            if (!isTransactionStarted()) {
-                // Clear any transactional state in case transaction was ended 
outside
-                // of Phoenix so we don't carry the old transaction state 
forward. We
-                // cannot call reset() here due to the case of having 
mutations and
-                // then transitioning from non transactional to transactional 
(which
-                // would end up clearing our uncommitted state).
-                resetTransactionalState();
-                txContext.start();
-                return true;
-            }
-        } catch (TransactionFailureException e) {
-            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException();
+
+        if (!isTransactionStarted()) {
+            // Clear any transactional state in case transaction was ended 
outside
+            // of Phoenix so we don't carry the old transaction state forward. 
We
+            // cannot call reset() here due to the case of having mutations and
+            // then transitioning from non transactional to transactional 
(which
+            // would end up clearing our uncommitted state).
+            resetTransactionalState();
+            phoenixTransactionContext.begin();
+            return true;
         }
+
         return false;
     }
 
     public static MutationState emptyMutationState(long maxSize, long 
maxSizeBytes, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, maxSizeBytes, 
connection, Collections.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null);
+        MutationState state = new MutationState(maxSize, maxSizeBytes, 
connection, Collections.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
         state.sizeOffset = 0;
         return state;
     }
@@ -529,13 +429,9 @@ public class MutationState implements SQLCloseable {
         if (this == newMutationState) { // Doesn't make sense
             return;
         }
-        if (txContext != null) {
-            for (TransactionAware txAware : newMutationState.txAwares) {
-                txContext.addTransactionAware(txAware);
-            }
-        } else {
-            txAwares.addAll(newMutationState.txAwares);
-        }
+
+        
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
+
         this.sizeOffset += newMutationState.sizeOffset;
         joinMutationState(newMutationState.mutations, this.mutations);
         if (!newMutationState.txMutations.isEmpty()) {
@@ -552,7 +448,7 @@ public class MutationState implements SQLCloseable {
         }
         throwIfTooBig();
     }
-    
+
 
     private static ImmutableBytesPtr 
getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable 
table) {
         RowKeySchema schema = table.getRowKeySchema();
@@ -1084,26 +980,16 @@ public class MutationState implements SQLCloseable {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted 
data
                             txTableRefs.add(origTableRef);
-                            addDMLFence(table);
                             
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                            
+
                             // If we have indexes, wrap the HTable in a 
delegate HTable that
                             // will attach the necessary index meta data in 
the event of a
                             // rollback
                             if (!table.getIndexes().isEmpty()) {
                                 hTable = new MetaDataAwareHTable(hTable, 
origTableRef);
                             }
-                            TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table.isImmutableRows());
-                            // Don't add immutable indexes (those are the only 
ones that would participate
-                            // during a commit), as we don't need conflict 
detection for these.
-                            if (tableInfo.isDataTable()) {
-                                // Even for immutable, we need to do this so 
that an abort has the state
-                                // necessary to generate the rows to delete.
-                                addTransactionParticipant(txnAware);
-                            } else {
-                                txnAware.startTx(getTransaction());
-                            }
-                            hTable = txnAware;
+
+                            hTable = 
TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, 
table);
                         }
                         
                         long numMutations = mutationList.size();
@@ -1209,15 +1095,11 @@ public class MutationState implements SQLCloseable {
     }
 
     public byte[] encodeTransaction() throws SQLException {
-        try {
-            return CODEC.encode(getTransaction());
-        } catch (IOException e) {
-            throw new SQLException(e);
-        }
+        return phoenixTransactionContext.encodeTransaction();
     }
-    
-    public static Transaction decodeTransaction(byte[] txnBytes) throws 
IOException {
-        return (txnBytes == null || txnBytes.length==0) ? null : 
CODEC.decode(txnBytes);
+
+    public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) 
throws IOException {
+        return 
TransactionFactory.getTransactionFactory().getTransactionContext(txnBytes);
     }
 
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
@@ -1294,29 +1176,22 @@ public class MutationState implements SQLCloseable {
         this.mutations.clear();
         resetTransactionalState();
     }
-    
+
     private void resetTransactionalState() {
-        tx = null;
-        txAwares.clear();
+        phoenixTransactionContext.reset();
         txMutations = Collections.emptyMap();
         uncommittedPhysicalNames.clear();
         uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
-    
+
     public void rollback() throws SQLException {
         try {
-            if (txContext != null && isTransactionStarted()) {
-                try {
-                    txContext.abort();
-                } catch (TransactionFailureException e) {
-                    throw TransactionUtil.getTransactionFailureException(e);
-                }
-            }
+            phoenixTransactionContext.abort();
         } finally {
             resetState();
         }
     }
-    
+
     public void commit() throws SQLException {
         Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = 
Collections.emptyMap();
         int retryCount = 0;
@@ -1332,38 +1207,32 @@ public class MutationState implements SQLCloseable {
                 sqlE = e;
             } finally {
                 try {
-                    if (txContext != null && isTransactionStarted()) {
-                        TransactionFailureException txFailure = null;
-                        boolean finishSuccessful=false;
-                        try {
-                            if (sendSuccessful) {
-                                txContext.finish();
-                                finishSuccessful = true;
-                            }
-                        } catch (TransactionFailureException e) {
-                            if (logger.isInfoEnabled()) 
logger.info(e.getClass().getName() + " at timestamp " + 
getInitialWritePointer() + " with retry count of " + retryCount);
-                            retryCommit = (e instanceof 
TransactionConflictException && retryCount < MAX_COMMIT_RETRIES);
-                            txFailure = e;
-                            SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
-                            if (sqlE == null) {
-                                sqlE = nextE;
-                            } else {
-                                sqlE.setNextException(nextE);
-                            }
-                        } finally {
-                            // If send fails or finish fails, abort the tx
-                            if (!finishSuccessful) {
-                                try {
-                                    txContext.abort(txFailure);
-                                    if (logger.isInfoEnabled()) 
logger.info("Abort successful");
-                                } catch (TransactionFailureException e) {
-                                    if (logger.isInfoEnabled()) 
logger.info("Abort failed with " + e);
-                                    SQLException nextE = 
TransactionUtil.getTransactionFailureException(e);
-                                    if (sqlE == null) {
-                                        sqlE = nextE;
-                                    } else {
-                                        sqlE.setNextException(nextE);
-                                    }
+                    boolean finishSuccessful=false;
+                    try {
+                        if (sendSuccessful) {
+                            phoenixTransactionContext.commit();
+                            finishSuccessful = true;
+                        }
+                    } catch (SQLException e) {
+                        if (logger.isInfoEnabled()) 
logger.info(e.getClass().getName() + " at timestamp " + 
getInitialWritePointer() + " with retry count of " + retryCount);
+                        retryCommit = (e.getErrorCode() == 
SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < 
MAX_COMMIT_RETRIES);
+                        if (sqlE == null) {
+                            sqlE = e;
+                        } else {
+                            sqlE.setNextException(e);
+                        }
+                    } finally {
+                        // If send fails or finish fails, abort the tx
+                        if (!finishSuccessful) {
+                            try {
+                                phoenixTransactionContext.abort();
+                                if (logger.isInfoEnabled()) logger.info("Abort 
successful");
+                            } catch (SQLException e) {
+                                if (logger.isInfoEnabled()) logger.info("Abort 
failed with " + e);
+                                if (sqlE == null) {
+                                    sqlE = e;
+                                } else {
+                                    sqlE.setNextException(e);
                                 }
                             }
                         }
@@ -1376,10 +1245,6 @@ public class MutationState implements SQLCloseable {
                             startTransaction();
                             // Add back read fences
                             Set<TableRef> txTableRefs = txMutations.keySet();
-                            for (TableRef tableRef : txTableRefs) {
-                                PTable dataTable = tableRef.getTable();
-                                addDMLFence(dataTable);
-                            }
                             try {
                                 // Only retry if an index was added
                                 retryCommit = 
shouldResubmitTransaction(txTableRefs);
@@ -1465,12 +1330,13 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException
      */
     public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws 
SQLException {
-        Transaction currentTx = getTransaction();
-        if (currentTx != null) {
+
+        if (phoenixTransactionContext.isTransactionRunning()) {
             // Initialize visibility so that transactions see their own writes.
             // The checkpoint() method will set it to not see writes if 
necessary.
-            currentTx.setVisibility(VisibilityLevel.SNAPSHOT);
+            
phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT);
         }
+
         Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new 
Predicate<TableRef>(){
             @Override
             public boolean apply(TableRef tableRef) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index be22334..a7ea99a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -100,6 +100,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -108,7 +109,6 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import org.apache.tephra.TxConstants;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -1046,7 +1046,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             }
                else if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()
                                // Since we don't include the index rows in the 
change set for txn tables, we need to detect row deletes that have transformed 
by TransactionProcessor
-                               || (CellUtil.matchingQualifier(kv, 
TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, 
HConstants.EMPTY_BYTE_ARRAY))) {
+                               || (CellUtil.matchingQualifier(kv, 
TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker())
 && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
                    nDeleteCF++;
                }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 9edcafc..18b9edd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -24,15 +24,13 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.tephra.Transaction;
-
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
-import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 
 public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     public IndexMetaDataCacheFactory() {
@@ -49,11 +47,12 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
     @Override
     public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] 
txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws 
SQLException {
         // just use the standard keyvalue builder - this doesn't really need 
to be fast
+        
         final List<IndexMaintainer> maintainers = 
                 IndexMaintainer.deserialize(cachePtr, 
GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
-        final Transaction txn;
+        final PhoenixTransactionContext txnContext;
         try {
-            txn = txState.length!=0 ? MutationState.decodeTransaction(txState) 
: null;
+            txnContext = txState.length != 0 ? 
TransactionFactory.getTransactionFactory().getTransactionContext(txState) : 
null;
         } catch (IOException e) {
             throw new SQLException(e);
         }
@@ -70,8 +69,8 @@ public class IndexMetaDataCacheFactory implements 
ServerCacheFactory {
             }
 
             @Override
-            public Transaction getTransaction() {
-                return txn;
+            public PhoenixTransactionContext getTransactionContext() {
+                return txnContext;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 39473dc..fa2fed2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -30,12 +30,12 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.tephra.Transaction;
 
 public class PhoenixIndexMetaData implements IndexMetaData {
     private final Map<String, byte[]> attributes;
@@ -56,7 +56,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
             final List<IndexMaintainer> indexMaintainers = 
IndexMaintainer.deserialize(md, useProto);
-            final Transaction txn = MutationState.decodeTransaction(txState);
+            final PhoenixTransactionContext txnContext = 
TransactionFactory.getTransactionFactory().getTransactionContext(txState);
             return new IndexMetaDataCache() {
 
                 @Override
@@ -68,8 +68,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
                 }
 
                 @Override
-                public Transaction getTransaction() {
-                    return txn;
+                public PhoenixTransactionContext getTransactionContext() {
+                    return txnContext;
                 }
 
             };
@@ -101,8 +101,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         this.ignoreNewerMutations = 
attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
     }
     
-    public Transaction getTransaction() {
-        return indexMetaDataCache.getTransaction();
+    public PhoenixTransactionContext getTransactionContext() {
+        return indexMetaDataCache.getTransactionContext();
     }
     
     public List<IndexMaintainer> getIndexMaintainers() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 9726d2c..d8a7e3d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -70,14 +70,14 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.Transaction.VisibilityLevel;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -152,7 +152,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
 
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
-        byte[] txRollbackAttribute = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
+        byte[] txRollbackAttribute = 
m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of 
if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
@@ -189,14 +189,14 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     }
     
     private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) 
throws IOException {
-        Transaction tx = indexMetaData.getTransaction();
-        if (tx == null) {
+        PhoenixTransactionContext txnContext = 
indexMetaData.getTransactionContext();
+        if (txnContext == null) {
             throw new NullPointerException("Expected to find transaction in 
metadata for " + env.getRegionInfo().getTable().getNameAsString());
         }
         boolean isRollback = txRollbackAttribute!=null;
         boolean isImmutable = indexMetaData.isImmutableRows();
         ResultScanner currentScanner = null;
-        TransactionAwareHTable txTable = null;
+        PhoenixTransactionalTable txTable = null;
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
@@ -261,23 +261,22 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 scanRanges.initializeScan(scan);
                 TableName tableName = 
env.getRegion().getRegionInfo().getTable();
                 HTableInterface htable = env.getTable(tableName);
-                txTable = new TransactionAwareHTable(htable);
-                txTable.startTx(tx);
+                txTable = 
TransactionFactory.getTransactionFactory().getTransactionalTable(txnContext, 
htable);
                 // For rollback, we need to see all versions, including
                 // the last committed version as there may be multiple
                 // checkpointed versions.
                 SkipScanFilter filter = scanRanges.getSkipScanFilter();
                 if (isRollback) {
                     filter = new SkipScanFilter(filter,true);
-                    tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
+                    
txnContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
                 }
                 scan.setFilter(filter);
                 currentScanner = txTable.getScanner(scan);
             }
             if (isRollback) {
-                processRollback(env, indexMetaData, txRollbackAttribute, 
currentScanner, tx, mutableColumns, indexUpdates, mutations);
+                processRollback(env, indexMetaData, txRollbackAttribute, 
currentScanner, txnContext, mutableColumns, indexUpdates, mutations);
             } else {
-                processMutation(env, indexMetaData, txRollbackAttribute, 
currentScanner, tx, mutableColumns, indexUpdates, mutations, 
findPriorValueMutations);
+                processMutation(env, indexMetaData, txRollbackAttribute, 
currentScanner, txnContext, mutableColumns, indexUpdates, mutations, 
findPriorValueMutations);
             }
         } finally {
             if (txTable != null) txTable.close();
@@ -300,7 +299,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     private void processMutation(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Transaction tx, 
+            PhoenixTransactionContext txnContext, 
             Set<ColumnReference> upsertColumns, 
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             Map<ImmutableBytesPtr, MultiMutation> mutations,
@@ -312,14 +311,14 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             // Process existing data table rows by removing the old index row 
and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new 
ImmutableBytesPtr(result.getRow()));
-                TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
+                TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), txnContext.getWritePointer(), m, emptyColRef, 
result);
                 generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
                 generatePuts(indexMetaData, indexUpdates, state);
             }
         }
         // Process new data table by adding new index rows
         for (Mutation m : mutations.values()) {
-            TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
+            TxTableState state = new TxTableState(env, upsertColumns, 
indexMetaData.getAttributes(), txnContext.getWritePointer(), m);
             generatePuts(indexMetaData, indexUpdates, state);
         }
     }
@@ -327,7 +326,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
     private void processRollback(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Transaction tx, Set<ColumnReference> mutableColumns,
+            PhoenixTransactionContext tx, Set<ColumnReference> mutableColumns,
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             Map<ImmutableBytesPtr, MultiMutation> mutations) throws 
IOException {
         if (scanner != null) {
@@ -414,7 +413,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
         for (IndexUpdate delete : deletes) {
             if (delete.isValid()) {
-                
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
+                
delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY,
 attribValue);
                 indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 3e7a6ca..4c64a4f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
@@ -54,11 +55,11 @@ import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.tephra.Transaction;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -111,7 +112,7 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
     Region dataRegion = null;
     IndexMaintainer indexMaintainer = null;
     byte[][] viewConstants = null;
-    Transaction tx = null;
+    PhoenixTransactionContext tx = null;
     ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
     if (dataColumns != null) {
       tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index c88727d..898a573 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.iterate;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -38,6 +39,7 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.*;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -95,7 +97,7 @@ public abstract class RegionScannerFactory {
       final Expression[] arrayFuncRefs, final int offset, final Scan scan,
       final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
       final Region dataRegion, final IndexMaintainer indexMaintainer,
-      Transaction tx,
+      PhoenixTransactionContext tx,
       final byte[][] viewConstants, final KeyValueSchema kvSchema,
       final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
       final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) 
{

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 42259b1..ccbd955 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -104,6 +104,7 @@ import org.apache.phoenix.schema.types.PUnsignedDate;
 import org.apache.phoenix.schema.types.PUnsignedTime;
 import org.apache.phoenix.schema.types.PUnsignedTimestamp;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.NumberUtil;
@@ -113,7 +114,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.tephra.TransactionContext;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -658,7 +658,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         mutationState.sendUncommitted();
     }
         
-    public void setTransactionContext(TransactionContext txContext) throws 
SQLException {
+    public void setTransactionContext(PhoenixTransactionContext txContext) 
throws SQLException {
         if 
(!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, 
QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
             .build().buildException();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b9929872/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 38580e4..45ab5fa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -46,7 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
-import org.apache.tephra.TransactionSystemClient;
 
 
 public interface ConnectionQueryServices extends QueryServices, 
MetaDataMutated {
@@ -132,7 +131,6 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     public long clearCache() throws SQLException;
     public int getSequenceSaltBuckets();
 
-    TransactionSystemClient getTransactionSystemClient();
     public long getRenewLeaseThresholdMilliSeconds();
     public boolean isRenewingLeasesEnabled();
 

Reply via email to