PHOENIX-4605 Support running multiple transaction providers

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8eaca121
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8eaca121
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8eaca121

Branch: refs/heads/4.x-cdh5.12
Commit: 8eaca12102c41a10694e9a343f97b8b83c15701c
Parents: 0535a17
Author: James Taylor <jtay...@salesforce.com>
Authored: Thu Apr 12 04:06:35 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Fri Apr 13 23:30:46 2018 +0100

----------------------------------------------------------------------
 .../phoenix/end2end/AlterTableWithViewsIT.java  |   6 +-
 .../ConnectionQueryServicesTestImpl.java        |  34 +-
 .../phoenix/tx/FlappingTransactionIT.java       |  11 +-
 .../phoenix/tx/ParameterizedTransactionIT.java  |  14 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  12 +
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |   5 +-
 .../phoenix/cache/IndexMetaDataCache.java       |   5 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   3 +-
 .../apache/phoenix/compile/FromCompiler.java    |   2 +-
 .../apache/phoenix/compile/JoinCompiler.java    |   2 +-
 .../compile/TupleProjectionCompiler.java        |   4 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   6 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   4 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  26 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   2 +-
 .../coprocessor/MetaDataRegionObserver.java     |   4 +-
 .../PhoenixTransactionalProcessor.java          |  28 --
 .../coprocessor/ServerCachingEndpointImpl.java  |   4 +-
 .../TephraTransactionalProcessor.java           |  29 ++
 .../UngroupedAggregateRegionObserver.java       |  10 +-
 .../coprocessor/generated/PTableProtos.java     | 110 +++++-
 .../phoenix/exception/SQLExceptionCode.java     |   4 +
 .../apache/phoenix/execute/BaseQueryPlan.java   |   3 +
 .../apache/phoenix/execute/MutationState.java   |  74 ++--
 .../PhoenixTxIndexMutationGenerator.java        |  10 +-
 .../phoenix/expression/ExpressionType.java      | 119 +-----
 .../TransactionProviderNameFunction.java        |  81 +++++
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../index/IndexMetaDataCacheFactory.java        |   2 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |   1 -
 .../index/PhoenixIndexMetaDataBuilder.java      |   7 +-
 .../NonAggregateRegionScannerFactory.java       |   5 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  12 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   2 +-
 .../index/PhoenixIndexPartialBuildMapper.java   |   4 +-
 .../phoenix/query/ConnectionQueryServices.java  |   6 +-
 .../query/ConnectionQueryServicesImpl.java      |  71 ++--
 .../query/ConnectionlessQueryServicesImpl.java  |  11 +-
 .../query/DelegateConnectionQueryServices.java  |  13 +-
 .../apache/phoenix/query/QueryConstants.java    |   2 +
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../phoenix/query/QueryServicesOptions.java     |  10 +-
 .../apache/phoenix/schema/DelegateTable.java    |   8 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 129 +++++--
 .../java/org/apache/phoenix/schema/PTable.java  |   2 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  68 ++--
 .../apache/phoenix/schema/TableProperty.java    |  18 +
 .../transaction/OmidTransactionContext.java     |  57 +--
 .../transaction/OmidTransactionProvider.java    |  54 +--
 .../transaction/OmidTransactionTable.java       | 363 -------------------
 .../transaction/PhoenixTransactionClient.java   |  23 ++
 .../transaction/PhoenixTransactionContext.java  | 169 +++++----
 .../transaction/PhoenixTransactionProvider.java |  51 +++
 .../transaction/PhoenixTransactionService.java  |  24 ++
 .../transaction/PhoenixTransactionalTable.java  | 149 --------
 .../transaction/TephraTransactionContext.java   | 205 +++--------
 .../transaction/TephraTransactionProvider.java  | 161 +++++++-
 .../transaction/TephraTransactionTable.java     | 350 ------------------
 .../phoenix/transaction/TransactionFactory.java |  57 ++-
 .../transaction/TransactionProvider.java        |  36 --
 .../org/apache/phoenix/util/PhoenixRuntime.java |   3 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  15 +
 .../apache/phoenix/util/TransactionUtil.java    |  93 ++++-
 .../phoenix/execute/CorrelatePlanTest.java      |   5 +-
 .../execute/LiteralResultIteratorPlanTest.java  |   5 +-
 .../java/org/apache/phoenix/query/BaseTest.java |  26 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  15 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |   2 +-
 phoenix-protocol/src/main/PTable.proto          |   1 +
 70 files changed, 1229 insertions(+), 1625 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index 6b57148..237a8d2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -36,7 +36,7 @@ import java.util.Collection;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
@@ -748,7 +748,7 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
             PName tenantId = isMultiTenant ? PNameFactory.newName("tenant1") : 
null;
             PhoenixConnection phoenixConn = 
conn.unwrap(PhoenixConnection.class);
             HTableInterface htable = 
phoenixConn.getQueryServices().getTable(Bytes.toBytes(baseTableName));
-            
assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+            
assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
             assertFalse(phoenixConn.getTable(new PTableKey(null, 
baseTableName)).isTransactional());
             assertFalse(viewConn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(tenantId, viewOfTable)).isTransactional());
             
@@ -757,7 +757,7 @@ public class AlterTableWithViewsIT extends 
ParallelStatsDisabledIT {
             // query the view to force the table cache to be updated
             viewConn.createStatement().execute("SELECT * FROM " + viewOfTable);
             htable = 
phoenixConn.getQueryServices().getTable(Bytes.toBytes(baseTableName));
-            
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+            
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
             assertTrue(phoenixConn.getTable(new PTableKey(null, 
baseTableName)).isTransactional());
             assertTrue(viewConn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(tenantId, viewOfTable)).isTransactional());
         } 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index a1ad1ad..3db93b0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,7 +30,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.query.ConnectionQueryServicesImpl;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.transaction.PhoenixTransactionClient;
+import org.apache.phoenix.transaction.PhoenixTransactionService;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.SQLCloseables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
@@ -42,9 +49,11 @@ import com.google.common.collect.Sets;
  * @since 0.1
  */
 public class ConnectionQueryServicesTestImpl extends 
ConnectionQueryServicesImpl {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionQueryServicesTestImpl.class);
     protected int NUM_SLAVES_BASE = 1; // number of slaves for the cluster
     // Track open connections to free them on close as unit tests don't always 
do this.
     private Set<PhoenixConnection> connections = Sets.newHashSet();
+    private final PhoenixTransactionService[] txServices = new 
PhoenixTransactionService[TransactionFactory.Provider.values().length];
     
     public ConnectionQueryServicesTestImpl(QueryServices services, 
ConnectionInfo info, Properties props) throws SQLException {
         super(services, info, props);
@@ -68,12 +77,33 @@ public class ConnectionQueryServicesTestImpl extends 
ConnectionQueryServicesImpl
                 // Make copy to prevent ConcurrentModificationException (TODO: 
figure out why this is necessary)
                 connections = new ArrayList<>(this.connections);
                 this.connections = Sets.newHashSet();
+                
+                // shut down the tx client service if we created one to 
support transactions
+                for (PhoenixTransactionService service : txServices) {
+                    if (service != null) {
+                        try {
+                            service.close();
+                        } catch (IOException e) {
+                            logger.warn(e.getMessage(), e);
+                        }
+                    }
+                }
+
             }
             SQLCloseables.closeAll(connections);
-             long unfreedBytes = clearCache();
-             assertEquals("Found unfreed bytes in server-side cache", 0, 
unfreedBytes);
+            long unfreedBytes = clearCache();
+            assertEquals("Found unfreed bytes in server-side cache", 0, 
unfreedBytes);
         } finally {
             super.close();
         }
     }
+    
+    @Override
+    public synchronized PhoenixTransactionClient 
initTransactionClient(Provider provider) {
+        PhoenixTransactionService txService = txServices[provider.ordinal()];
+        if (txService == null) {
+            txService = txServices[provider.ordinal()] = 
provider.getTransactionProvider().getTransactionService(config, connectionInfo);
+        }
+        return super.initTransactionClient(provider);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 200cf1c..0323c32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -43,7 +43,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -225,9 +224,9 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         }
 
         PhoenixTransactionContext txContext =
-              
TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
-        PhoenixTransactionalTable txTable =
-              
TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, 
htable);
+              
TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
+        HTableInterface txTable =
+                txContext.getTransactionalTable(htable, false);
 
         txContext.begin();
 
@@ -277,9 +276,9 @@ public class FlappingTransactionIT extends 
ParallelStatsDisabledIT {
         
         // Repeat the same as above, but this time abort the transaction
         txContext =
-              
TransactionFactory.getTransactionProvider().getTransactionContext(pconn);
+              
TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
         txTable =
-              
TransactionFactory.getTransactionProvider().getTransactionalTable(txContext, 
htable);
+              txContext.getTransactionalTable(htable, false);
 
         txContext.begin();
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index fecfd9a..5421801 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 package org.apache.phoenix.tx;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -280,9 +280,9 @@ public class ParameterizedTransactionIT extends 
ParallelStatsDisabledIT {
         conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET 
TRANSACTIONAL=true");
         
         htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( 
nonTxTableName));
-        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index));
-        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
 
         conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " 
VALUES (4, 'c')");
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ 
NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL");
@@ -357,7 +357,7 @@ public class ParameterizedTransactionIT extends 
ParallelStatsDisabledIT {
         assertFalse(rs.next());
         
         htable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM."
 + nonTxTableName));
-        
assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        
assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
                 getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
                 
getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -375,7 +375,7 @@ public class ParameterizedTransactionIT extends 
ParallelStatsDisabledIT {
         PTable table = pconn.getTable(new PTableKey(null, t1));
         HTableInterface htable = 
pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
         
         try {
             ddl = "ALTER TABLE " + t1 + " SET transactional=false";
@@ -409,7 +409,7 @@ public class ParameterizedTransactionIT extends 
ParallelStatsDisabledIT {
         table = pconn.getTable(new PTableKey(null, t1));
         htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
+        
assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 9286c2e..f1344e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -21,6 +21,7 @@ import static 
org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -44,6 +45,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -129,6 +131,7 @@ public class TransactionIT  extends ParallelStatsDisabledIT 
{
             assertTrue(rs.next());
             assertEquals("Transactional table was not marked as transactional 
in JDBC API.",
                 "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
+            assertEquals(TransactionFactory.Provider.TEPHRA.name(), 
rs.getString(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
 
             String nonTransactTableName = generateUniqueName();
             Statement stmt2 = conn.createStatement();
@@ -139,6 +142,15 @@ public class TransactionIT  extends 
ParallelStatsDisabledIT {
             assertTrue(rs2.next());
             assertEquals("Non-transactional table was marked as transactional 
in JDBC API.",
                 "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL));
+            
assertNull(rs2.getString(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
+            
+            try {
+                stmt.execute("CREATE TABLE " + transactTableName + " (k 
VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " +
+                        "TRANSACTION_PROVIDER=foo");
+                fail();
+            } catch (SQLException e) {
+                
assertEquals(SQLExceptionCode.UNKNOWN_TRANSACTION_PROVIDER.getErrorCode(), 
e.getErrorCode());
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index bf39dfe..42e2102 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
 import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Test;
@@ -265,7 +266,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
                ResultSet rs;
                MutationState state = conn.unwrap(PhoenixConnection.class)
                                .getMutationState();
-               state.startTransaction();
+               state.startTransaction(TransactionFactory.Provider.TEPHRA);
                long wp = state.getWritePointer();
                conn.createStatement().execute(
                                "upsert into " + fullTableName + " select 
max(id)+1, 'a4', 'b4' from " + fullTableName + "");
@@ -329,7 +330,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT 
{
                        conn.commit();
 
                MutationState state = 
conn.unwrap(PhoenixConnection.class).getMutationState();
-               state.startTransaction();
+               state.startTransaction(TransactionFactory.Provider.TEPHRA);
                long wp = state.getWritePointer();
                conn.createStatement().execute("delete from " + fullTableName + 
"1 where id1=fk1b AND fk1b=id1");
                assertEquals(PhoenixVisibilityLevel.SNAPSHOT, 
state.getVisibilityLevel());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index 17e6fb6..9f3dd59 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,12 +23,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.util.ScanUtil;
 
 public interface IndexMetaDataCache extends Closeable {
-    public static int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 
0);
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new 
IndexMetaDataCache() {
 
         @Override
@@ -47,7 +46,7 @@ public interface IndexMetaDataCache extends Closeable {
         
         @Override
         public int getClientVersion() {
-            return UNKNOWN_CLIENT_VERSION;
+            return ScanUtil.UNKNOWN_CLIENT_VERSION;
         }
         
     };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 7985314..5f9c76c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -783,8 +782,8 @@ public class DeleteCompiler {
                     byte[] uuidValue = ServerCacheClient.generateId();
                     
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                     
context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                    
context.getScan().setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                     
context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                    ScanUtil.setClientVersion(context.getScan(), 
MetaDataProtocol.PHOENIX_VERSION);
                 }
                 ResultIterator iterator = aggPlan.iterator();
                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index b5293bb..3faada7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -821,7 +821,7 @@ public class FromCompiler {
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME, PTableType.SUBQUERY, null,
                     MetaDataProtocol.MIN_TABLE_TIMESTAMP, 
PTable.INITIAL_SEQ_NUM, null, null, columns, null, null,
                     Collections.<PTable> emptyList(), false, 
Collections.<PName> emptyList(), null, null, false, false,
-                    false, null, null, null, false, false, 0, 0L, SchemaUtil
+                    false, null, null, null, false, null, 0, 0L, SchemaUtil
                             .isNamespaceMappingEnabled(PTableType.SUBQUERY, 
connection.getQueryServices().getProps()), null, false, 
ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, 
PTable.EncodedCQCounter.NULL_COUNTER, true);
 
             String alias = subselectNode.getAlias();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 88e8f50..824d933 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1276,7 +1276,7 @@ public class JoinCompiler {
                 left.getBucketNum(), merged, left.getParentSchemaName(), 
left.getParentTableName(), left.getIndexes(),
                 left.isImmutableRows(), Collections.<PName> emptyList(), null, 
null, PTable.DEFAULT_DISABLE_WAL,
                 left.isMultiTenant(), left.getStoreNulls(), 
left.getViewType(), left.getViewIndexId(),
-                left.getIndexType(), left.rowKeyOrderOptimizable(), 
left.isTransactional(),
+                left.getIndexType(), left.rowKeyOrderOptimizable(), 
left.getTransactionProvider(),
                 left.getUpdateCacheFrequency(), 
left.getIndexDisableTimestamp(), left.isNamespaceMapped(), 
                 left.getAutoPartitionSeqName(), left.isAppendOnlySchema(), 
ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, 
PTable.EncodedCQCounter.NULL_COUNTER, left.useStatsForParallelization());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 6e52cd5..91be356 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -167,7 +167,7 @@ public class TupleProjectionCompiler {
                 table.getParentTableName(), table.getIndexes(), 
table.isImmutableRows(), Collections.<PName> emptyList(),
                 table.getDefaultFamilyName(), table.getViewStatement(), 
table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), 
table.getViewType(),
                 table.getViewIndexId(),
-                table.getIndexType(), table.rowKeyOrderOptimizable(), 
table.isTransactional(), table.getUpdateCacheFrequency(), 
+                table.getIndexType(), table.rowKeyOrderOptimizable(), 
table.getTransactionProvider(), table.getUpdateCacheFrequency(), 
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), 
table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
     
@@ -198,7 +198,7 @@ public class TupleProjectionCompiler {
                 table.getBucketNum(), projectedColumns, null, null,
                 Collections.<PTable> emptyList(), table.isImmutableRows(), 
Collections.<PName> emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(),
-                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), 
table.isTransactional(),
+                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), 
table.getTransactionProvider(),
                 table.getUpdateCacheFrequency(), 
table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), 
table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter, 
table.useStatsForParallelization());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index d5bfef8..9ca92f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -35,12 +35,12 @@ import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -100,7 +100,7 @@ public class UnionCompiler {
             UNION_SCHEMA_NAME, UNION_TABLE_NAME, PTableType.SUBQUERY, null,
             HConstants.LATEST_TIMESTAMP, scn == null ? 
HConstants.LATEST_TIMESTAMP : scn,
             null, null, projectedColumns, null, null, null, true, null, null, 
null, true,
-            true, true, null, null, null, false, false, 0, 0L,
+            true, true, null, null, null, false, null, 0, 0L,
             SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY,
                 statement.getConnection().getQueryServices().getProps()), 
null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, 
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, 
PTable.EncodedCQCounter.NULL_COUNTER, true);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 1a9a686..22119a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -1020,8 +1020,8 @@ public class UpsertCompiler {
                 byte[] uuidValue = ServerCacheClient.generateId();
                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                scan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
             }
             ResultIterator iterator = aggPlan.iterator();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 3310131..a426e39 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -49,10 +48,8 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.iterate.RegionScannerFactory;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
-import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
@@ -109,6 +106,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public final static String QUALIFIER_ENCODING_SCHEME = 
"_QualifierEncodingScheme";
     public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = 
"_ImmutableStorageEncodingScheme";
     public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = 
"_UseEncodedColumnQualifierList";
+    public static final String CLIENT_VERSION = "_ClientVersion";
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = 
PUnsignedTinyint.INSTANCE.toBytes(2);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 05ad959..a2d008b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -69,6 +69,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES;
@@ -228,6 +229,7 @@ import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -295,6 +297,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final KeyValue BASE_COLUMN_COUNT_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
     private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
     private static final KeyValue TRANSACTIONAL_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TRANSACTIONAL_BYTES);
+    private static final KeyValue TRANSACTION_PROVIDER_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
TRANSACTION_PROVIDER_BYTES);
     private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
UPDATE_CACHE_FREQUENCY_BYTES);
     private static final KeyValue IS_NAMESPACE_MAPPED_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
@@ -326,6 +329,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
             BASE_COLUMN_COUNT_KV,
             ROW_KEY_ORDER_OPTIMIZABLE_KV,
             TRANSACTIONAL_KV,
+            TRANSACTION_PROVIDER_KV,
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
@@ -357,6 +361,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
     private static final int BASE_COLUMN_COUNT_INDEX = 
TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
     private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = 
TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
     private static final int TRANSACTIONAL_INDEX = 
TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
+    private static final int TRANSACTION_PROVIDER_INDEX = 
TABLE_KV_COLUMNS.indexOf(TRANSACTION_PROVIDER_KV);
     private static final int UPDATE_CACHE_FREQUENCY_INDEX = 
TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
     private static final int INDEX_DISABLE_TIMESTAMP = 
TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
     private static final int IS_NAMESPACE_MAPPED_INDEX = 
TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
@@ -961,7 +966,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol 
implements Coprocesso
         Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
         boolean storeNulls = storeNullsKv == null ? false : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), 
storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
         Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
-        boolean transactional = transactionalKv == null ? false : 
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(transactionalKv.getValueArray(), 
transactionalKv.getValueOffset(), transactionalKv.getValueLength()));
+        Cell transactionProviderKv = 
tableKeyValues[TRANSACTION_PROVIDER_INDEX];
+        TransactionFactory.Provider transactionProvider = null;
+        if (transactionProviderKv == null) {
+            if (transactionalKv != null && Boolean.TRUE.equals(
+                    PBoolean.INSTANCE.toObject(
+                            transactionalKv.getValueArray(), 
+                            transactionalKv.getValueOffset(), 
+                            transactionalKv.getValueLength()))) {
+                // For backward compat, prior to client setting 
TRANSACTION_PROVIDER
+                transactionProvider = TransactionFactory.Provider.TEPHRA;
+            }
+        } else {
+            transactionProvider = TransactionFactory.Provider.fromCode(
+                    PTinyint.INSTANCE.getCodec().decodeByte(
+                            transactionProviderKv.getValueArray(),
+                            transactionProviderKv.getValueOffset(), 
+                            SortOrder.getDefault()));
+        }
         Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
         ViewType viewType = viewTypeKv == null ? null : 
ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
@@ -1044,7 +1066,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
         return PTableImpl.makePTable(tenantId, schemaName, tableName, 
tableType, indexState, timeStamp, tableSeqNum,
                 pkName, saltBucketNum, columns, parentSchemaName, 
parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, 
viewIndexId, indexType,
-                rowKeyOrderOptimizable, transactional, updateCacheFrequency, 
baseColumnCount,
+                rowKeyOrderOptimizable, transactionProvider, 
updateCacheFrequency, baseColumnCount,
                 indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, 
isAppendOnlySchema, storageScheme, encodingScheme, cqCounter, 
useStatsForParallelization);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 4c4c96f..a71ce0c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -95,7 +95,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     // Since there's no upgrade code, keep the version the same as the 
previous version
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = 
MIN_TABLE_TIMESTAMP + 28;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the 
MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0;
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 393a2f9..4968525 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -83,6 +82,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
@@ -496,7 +496,7 @@ public class MetaDataRegionObserver extends 
BaseRegionObserver {
                                                                        conn);
                                                        byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
                                                        
dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-                                                       
dataTableScan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+                                                       
ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION);
                             LOG.info("Starting to partially build indexes:" + 
indexesToPartiallyRebuild
                                     + " on data table:" + dataPTable.getName() 
+ " with the earliest disable timestamp:"
                                     + earliestDisableTimestamp + " till "

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
deleted file mode 100644
index 0c26ecc..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.coprocessor;
-
-import org.apache.phoenix.transaction.TransactionFactory;
-
-public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
-
-    public PhoenixTransactionalProcessor() {
-        
super(TransactionFactory.getTransactionProvider().getTransactionContext().getCoprocessor());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 448f61c..9d78659 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
@@ -37,6 +36,7 @@ import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachin
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.protobuf.RpcCallback;
@@ -75,7 +75,7 @@ public class ServerCachingEndpointImpl extends 
ServerCachingService implements C
           ServerCacheFactory cacheFactory = 
serverCacheFactoryClass.newInstance();
           tenantCache.addServerCache(new 
ImmutableBytesPtr(request.getCacheId().toByteArray()),
               cachePtr, txState, cacheFactory, 
request.hasHasProtoBufIndexMaintainer() && 
request.getHasProtoBufIndexMaintainer(),
-              request.hasClientVersion() ? request.getClientVersion() : 
IndexMetaDataCache.UNKNOWN_CLIENT_VERSION);
+              request.hasClientVersion() ? request.getClientVersion() : 
ScanUtil.UNKNOWN_CLIENT_VERSION);
         } catch (Throwable e) {
             ProtobufUtil.setControllerException(controller,
                 ServerUtil.createIOException("Error when adding cache: ", e));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
new file mode 100644
index 0000000..a702bc4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TephraTransactionalProcessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+
+public class TephraTransactionalProcessor extends DelegateRegionObserver {
+
+    public TephraTransactionalProcessor() {
+        super(new TransactionProcessor());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index de57772..31b512a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -277,7 +277,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
            }
            if (clientVersionBytes != null) {
-               m.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
clientVersionBytes);
+               m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, 
clientVersionBytes);
            }
         }
     }
@@ -523,7 +523,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             useIndexProto = false;
         }
 
-        byte[] clientVersionBytes = 
scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
+        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean acquiredLock = false;
         boolean incrScanRefCount = false;
         final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
@@ -1023,7 +1023,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             useProto = false;
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
-        byte[] clientVersionBytes = 
scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION);
+        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
         boolean hasMore;
         int rowCount = 0;
         try {
@@ -1048,7 +1048,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     put.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     put.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
-                                    
put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+                                    
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(put);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
                                     put.setDurability(Durability.SKIP_WAL);
@@ -1060,7 +1060,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                     del.setAttribute(useProto ? 
PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
                                     
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                     del.setAttribute(REPLAY_WRITES, 
REPLAY_ONLY_INDEX_WRITES);
-                                    
del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes);
+                                    
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
                                     mutations.add(del);
                                     // Since we're replaying existing 
mutations, it makes no sense to write them to the wal
                                     del.setDurability(Durability.SKIP_WAL);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 278f301..8d500e8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3520,6 +3520,16 @@ public final class PTableProtos {
      * <code>optional bool useStatsForParallelization = 37;</code>
      */
     boolean getUseStatsForParallelization();
+
+    // optional int32 transactionProvider = 38;
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    boolean hasTransactionProvider();
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    int getTransactionProvider();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3771,6 +3781,11 @@ public final class PTableProtos {
               useStatsForParallelization_ = input.readBool();
               break;
             }
+            case 304: {
+              bitField1_ |= 0x00000001;
+              transactionProvider_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3823,6 +3838,7 @@ public final class PTableProtos {
     }
 
     private int bitField0_;
+    private int bitField1_;
     // required bytes schemaNameBytes = 1;
     public static final int SCHEMANAMEBYTES_FIELD_NUMBER = 1;
     private com.google.protobuf.ByteString schemaNameBytes_;
@@ -4534,6 +4550,22 @@ public final class PTableProtos {
       return useStatsForParallelization_;
     }
 
+    // optional int32 transactionProvider = 38;
+    public static final int TRANSACTIONPROVIDER_FIELD_NUMBER = 38;
+    private int transactionProvider_;
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    public boolean hasTransactionProvider() {
+      return ((bitField1_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int32 transactionProvider = 38;</code>
+     */
+    public int getTransactionProvider() {
+      return transactionProvider_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4571,6 +4603,7 @@ public final class PTableProtos {
       encodingScheme_ = com.google.protobuf.ByteString.EMPTY;
       encodedCQCounters_ = java.util.Collections.emptyList();
       useStatsForParallelization_ = false;
+      transactionProvider_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4746,6 +4779,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x80000000) == 0x80000000)) {
         output.writeBool(37, useStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        output.writeInt32(38, transactionProvider_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4904,6 +4940,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(37, useStatsForParallelization_);
       }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(38, transactionProvider_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5095,6 +5135,11 @@ public final class PTableProtos {
         result = result && (getUseStatsForParallelization()
             == other.getUseStatsForParallelization());
       }
+      result = result && (hasTransactionProvider() == 
other.hasTransactionProvider());
+      if (hasTransactionProvider()) {
+        result = result && (getTransactionProvider()
+            == other.getTransactionProvider());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -5252,6 +5297,10 @@ public final class PTableProtos {
         hash = (37 * hash) + USESTATSFORPARALLELIZATION_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getUseStatsForParallelization());
       }
+      if (hasTransactionProvider()) {
+        hash = (37 * hash) + TRANSACTIONPROVIDER_FIELD_NUMBER;
+        hash = (53 * hash) + getTransactionProvider();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -5448,6 +5497,8 @@ public final class PTableProtos {
         }
         useStatsForParallelization_ = false;
         bitField1_ = (bitField1_ & ~0x00000008);
+        transactionProvider_ = 0;
+        bitField1_ = (bitField1_ & ~0x00000010);
         return this;
       }
 
@@ -5477,6 +5528,7 @@ public final class PTableProtos {
         int from_bitField0_ = bitField0_;
         int from_bitField1_ = bitField1_;
         int to_bitField0_ = 0;
+        int to_bitField1_ = 0;
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
@@ -5637,7 +5689,12 @@ public final class PTableProtos {
           to_bitField0_ |= 0x80000000;
         }
         result.useStatsForParallelization_ = useStatsForParallelization_;
+        if (((from_bitField1_ & 0x00000010) == 0x00000010)) {
+          to_bitField1_ |= 0x00000001;
+        }
+        result.transactionProvider_ = transactionProvider_;
         result.bitField0_ = to_bitField0_;
+        result.bitField1_ = to_bitField1_;
         onBuilt();
         return result;
       }
@@ -5841,6 +5898,9 @@ public final class PTableProtos {
         if (other.hasUseStatsForParallelization()) {
           setUseStatsForParallelization(other.getUseStatsForParallelization());
         }
+        if (other.hasTransactionProvider()) {
+          setTransactionProvider(other.getTransactionProvider());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7920,6 +7980,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int32 transactionProvider = 38;
+      private int transactionProvider_ ;
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public boolean hasTransactionProvider() {
+        return ((bitField1_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public int getTransactionProvider() {
+        return transactionProvider_;
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public Builder setTransactionProvider(int value) {
+        bitField1_ |= 0x00000010;
+        transactionProvider_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 transactionProvider = 38;</code>
+       */
+      public Builder clearTransactionProvider() {
+        bitField1_ = (bitField1_ & ~0x00000010);
+        transactionProvider_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -8587,7 +8680,7 @@ public final class PTableProtos {
       "\"\232\001\n\013PTableStats\022\013\n\003key\030\001 
\002(\014\022\016\n\006values\030",
       "\002 \003(\014\022\033\n\023guidePostsByteCount\030\003 
\001(\003\022\025\n\rke" +
       "yBytesCount\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 
\001" +
-      "(\005\022!\n\013pGuidePosts\030\006 
\001(\0132\014.PGuidePosts\"\220\007" +
+      "(\005\022!\n\013pGuidePosts\030\006 
\001(\0132\014.PGuidePosts\"\255\007" +
       "\n\006PTable\022\027\n\017schemaNameBytes\030\001 
\002(\014\022\026\n\016tab" +
       "leNameBytes\030\002 \002(\014\022\036\n\ttableType\030\003 
\002(\0162\013.P" +
       "TableType\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenc" +
@@ -8610,12 +8703,13 @@ public final class PTableProtos {
       "\n\017parentNameBytes\030! \001(\014\022\025\n\rstorageScheme" +
       "\030\" \001(\014\022\026\n\016encodingScheme\030# 
\001(\014\022,\n\021encode" +
       "dCQCounters\030$ \003(\0132\021.EncodedCQCounter\022\"\n\032" +
-      "useStatsForParallelization\030% \001(\010\"6\n\020Enco" +
-      "dedCQCounter\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007count" +
-      "er\030\002 
\002(\005*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004US" +
-      
"ER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(o"
 +
-      "rg.apache.phoenix.coprocessor.generatedB",
-      "\014PTableProtosH\001\210\001\001\240\001\001"
+      "useStatsForParallelization\030% \001(\010\022\033\n\023tran" +
+      "sactionProvider\030& \001(\005\"6\n\020EncodedCQCounte" +
+      "r\022\021\n\tcolFamily\030\001 \002(\t\022\017\n\007counter\030\002 
\002(\005*A\n" +
+      
"\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE"
 +
+      
"W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p",
+      "hoenix.coprocessor.generatedB\014PTableProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -8639,7 +8733,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", 
"TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", 
"BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", 
"DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", 
"PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", 
"StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", 
"UpdateCacheFrequency", "IndexDisableTimestamp", "IsNamespaceMapped", 
"AutoParititonSeqName", "IsAppendOnlySchema", "ParentNameBytes", 
"StorageScheme", "EncodingScheme", "EncodedCQCounters", 
"UseStatsForParallelization", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", 
"TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", 
"BucketNum", "Columns", "Indexes", "IsImmutableRows", "DataTableNameBytes", 
"DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", 
"PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", 
"StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", 
"UpdateCacheFrequency", "IndexDisableTimestamp", "IsNamespaceMapped", 
"AutoParititonSeqName", "IsAppendOnlySchema", "ParentNameBytes", 
"StorageScheme", "EncodingScheme", "EncodedCQCounters", 
"UseStatsForParallelization", "TransactionProvider", });
           internal_static_EncodedCQCounter_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_EncodedCQCounter_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9cbc67e..e9e209b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -294,6 +294,10 @@ public enum SQLExceptionCode {
     SEQUENCE_NOT_CASTABLE_TO_AUTO_PARTITION_ID_COLUMN(1086, "44A17", "Sequence 
Value not castable to auto-partition id column"),
     CANNOT_COERCE_AUTO_PARTITION_ID(1087, "44A18", "Auto-partition id cannot 
be coerced"),
     CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP(1088, "44A19", 
"Cannot create an index on a mutable table that has a ROW_TIMESTAMP column."),
+    UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: 
"),
+    CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction 
if transactions are disabled."),
+    CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: 
"),
+    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table 
from non transactional to transactional for "),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index b152030..8a8c822 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -252,6 +253,8 @@ public abstract class BaseQueryPlan implements QueryPlan {
             return newIterator(scanGrouper, scan, caches);
         }
         
+        ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+        
         // Set miscellaneous scan attributes. This is the last chance to set 
them before we
         // clone the scan for each parallelized chunk.
         TableRef tableRef = context.getCurrentTable();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index f6d11a0..ca3e6d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -72,6 +72,8 @@ import 
org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
 import 
org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
@@ -91,8 +93,8 @@ import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -139,7 +141,7 @@ public class MutationState implements SQLCloseable {
     private boolean isExternalTxContext = false;
     private Map<TableRef, MultiRowMutationState> txMutations = 
Collections.emptyMap();
 
-    final PhoenixTransactionContext phoenixTransactionContext;
+    private PhoenixTransactionContext phoenixTransactionContext = 
PhoenixTransactionContext.NULL_CONTEXT;
 
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -181,17 +183,13 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-        if (!subTask) {
-            if (txContext == null) {
-                phoenixTransactionContext = 
TransactionFactory.getTransactionProvider().getTransactionContext(connection);
-            } else {
-                isExternalTxContext = true;
-                phoenixTransactionContext = 
TransactionFactory.getTransactionProvider().getTransactionContext(txContext, 
connection, subTask);
-            }
-        } else {
+        if (subTask) {
             // this code path is only used while running child scans, we can't 
pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
-            phoenixTransactionContext = 
TransactionFactory.getTransactionProvider().getTransactionContext(txContext, 
connection, subTask);
+            phoenixTransactionContext = 
txContext.newTransactionContext(txContext, subTask);
+        } else if (txContext != null) {
+            isExternalTxContext = true;
+            phoenixTransactionContext = 
txContext.newTransactionContext(txContext, subTask);
         }
     }
 
@@ -234,7 +232,7 @@ public class MutationState implements SQLCloseable {
     public void commitDDLFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
             try {
-                phoenixTransactionContext.commitDDLFence(dataTable, logger);
+                phoenixTransactionContext.commitDDLFence(dataTable);
             } finally {
                 // The client expects a transaction to be in progress on the 
txContext while the
                 // VisibilityFence.prepareWait() starts a new tx and 
finishes/aborts it. After it's
@@ -300,14 +298,12 @@ public class MutationState implements SQLCloseable {
     // Though MutationState is not thread safe in general, this method should 
be because it may
     // be called by TableResultIterator in a multi-threaded manner. Since we 
do not want to expose
     // the Transaction outside of MutationState, this seems reasonable, as the 
member variables
-    // would not change as these threads are running.
+    // would not change as these threads are running. We also clone 
mutationState to ensure that
+    // the transaction context won't change due to a commit when auto commit 
is true.
     public HTableInterface getHTable(PTable table) throws SQLException {
         HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional() && 
phoenixTransactionContext.isTransactionRunning()) {
-            PhoenixTransactionalTable phoenixTransactionTable = 
TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, 
table);
-            // Using cloned mutationState as we may have started a new 
transaction already
-            // if auto commit is true and we need to use the original one here.
-            htable = phoenixTransactionTable;
+            htable = phoenixTransactionContext.getTransactionalTable(htable, 
table.isImmutableRows());
         }
         return htable;
     }
@@ -334,13 +330,32 @@ public class MutationState implements SQLCloseable {
         return phoenixTransactionContext.getVisibilityLevel();
     }
 
-    public boolean startTransaction() throws SQLException {
+    public boolean startTransaction(Provider provider) throws SQLException {
+        if (provider == null) {
+            return false;
+        }
+        if (!connection.getQueryServices().getProps().getBoolean(
+                QueryServices.TRANSACTIONS_ENABLED,
+                QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.CANNOT_START_TXN_IF_TXN_DISABLED)
+                    .build().buildException();
+        }
         if (connection.getSCN() != null) {
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
                     .build().buildException();
         }
 
+        if (phoenixTransactionContext == 
PhoenixTransactionContext.NULL_CONTEXT) {
+            phoenixTransactionContext = 
provider.getTransactionProvider().getTransactionContext(connection);
+        } else {
+            if (provider != phoenixTransactionContext.getProvider()) {
+                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS)
+                        
.setMessage(phoenixTransactionContext.getProvider().name() + " and " + 
provider.name())
+                        .build().buildException();
+            }
+        }
         if (!isTransactionStarted()) {
             // Clear any transactional state in case transaction was ended 
outside
             // of Phoenix so we don't carry the old transaction state forward. 
We
@@ -1069,6 +1084,7 @@ public class MutationState implements SQLCloseable {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted 
data
                             
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+                            phoenixTransactionContext.markDMLFence(table);
 
                             // If we have indexes, wrap the HTable in a 
delegate HTable that
                             // will attach the necessary index meta data in 
the event of a
@@ -1077,7 +1093,7 @@ public class MutationState implements SQLCloseable {
                                 hTable = new MetaDataAwareHTable(hTable, 
origTableRef);
                             }
 
-                            hTable = 
TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, 
table);
+                            hTable = 
phoenixTransactionContext.getTransactionalTable(hTable, 
table.isImmutableRows());
                         }
                         
                         numMutations = mutationList.size();
@@ -1223,10 +1239,6 @@ public class MutationState implements SQLCloseable {
         return phoenixTransactionContext.encodeTransaction();
     }
 
-    public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) 
throws IOException {
-        return 
TransactionFactory.getTransactionProvider().getTransactionContext(txnBytes);
-    }
-
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
             ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
         PTable table = tableRef.getTable();
@@ -1269,7 +1281,7 @@ public class MutationState implements SQLCloseable {
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
             if (attribValue != null) {
                 mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
attribValue);
-                mutation.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+                
mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, 
Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 if (txState.length > 0) {
                     mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
                 }
@@ -1301,7 +1313,7 @@ public class MutationState implements SQLCloseable {
         numRows = 0;
         estimatedSize = 0;
         this.mutations.clear();
-        resetTransactionalState();
+        phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT;
     }
 
     private void resetTransactionalState() {
@@ -1365,13 +1377,18 @@ public class MutationState implements SQLCloseable {
                         }
                     }
                 } finally {
+                    TransactionFactory.Provider provider = 
phoenixTransactionContext.getProvider();
                     try {
                         resetState();
                     } finally {
                         if (retryCommit) {
-                            startTransaction();
+                            startTransaction(provider);
                             // Add back read fences
                             Set<TableRef> txTableRefs = txMutations.keySet();
+                            for (TableRef tableRef : txTableRefs) {
+                                PTable dataTable = tableRef.getTable();
+                                
phoenixTransactionContext.markDMLFence(dataTable);
+                            }
                             try {
                                 // Only retry if an index was added
                                 retryCommit = 
shouldResubmitTransaction(txTableRefs);
@@ -1476,9 +1493,12 @@ public class MutationState implements SQLCloseable {
             List<TableRef> strippedAliases = 
Lists.newArrayListWithExpectedSize(mutations.keySet().size());
             while (filteredTableRefs.hasNext()) {
                 TableRef tableRef = filteredTableRefs.next();
+                // REVIEW: unclear if we need this given we start transactions 
when resolving a table
+                if (tableRef.getTable().isTransactional()) {
+                    
startTransaction(tableRef.getTable().getTransactionProvider());
+                }
                 strippedAliases.add(new TableRef(null, tableRef.getTable(), 
tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), 
tableRef.hasDynamicCols()));
             }
-            startTransaction();
             send(strippedAliases.iterator());
             return true;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 7d6154e..6348d6d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -31,8 +31,6 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -64,8 +63,6 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import 
org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.transaction.PhoenixTransactionalTable;
-import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -76,9 +73,6 @@ import com.google.common.primitives.Longs;
 
 
 public class PhoenixTxIndexMutationGenerator {
-
-    private static final Log LOG = 
LogFactory.getLog(PhoenixTxIndexMutationGenerator.class);
-
     private final PhoenixIndexCodec codec;
     private final PhoenixIndexMetaData indexMetaData;
 
@@ -181,7 +175,7 @@ public class PhoenixTxIndexMutationGenerator {
             scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
             ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
             scanRanges.initializeScan(scan);
-            PhoenixTransactionalTable txTable = 
TransactionFactory.getTransactionProvider().getTransactionalTable(indexMetaData.getTransactionContext(),
 htable);
+            Table txTable = 
indexMetaData.getTransactionContext().getTransactionalTable(htable, 
isImmutable);
             // For rollback, we need to see all versions, including
             // the last committed version as there may be multiple
             // checkpointed versions.

Reply via email to