http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7357ef3..450a99f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -109,7 +109,6 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -541,7 +540,11 @@ public class UpsertCompiler {
                 // Disable running upsert select on server side if a table has 
global mutable secondary indexes on it
                 boolean hasGlobalMutableIndexes = 
SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows();
                 boolean hasWhereSubquery = select.getWhere() != null && 
select.getWhere().hasSubquery();
-                runOnServer = (sameTable || (serverUpsertSelectEnabled && 
!hasGlobalMutableIndexes)) && isAutoCommit && !table.isTransactional()
+                runOnServer = (sameTable || (serverUpsertSelectEnabled && 
!hasGlobalMutableIndexes)) && isAutoCommit 
+                        // We can run the upsert select for initial index 
population on server side for transactional
+                        // tables since the writes do not need to be done 
transactionally, since we gate the index
+                        // usage on successfully writing all data rows.
+                        && (!table.isTransactional() || table.getType() == 
PTableType.INDEX)
                         && !(table.isImmutableRows() && 
!table.getIndexes().isEmpty())
                         && !select.isJoin() && !hasWhereSubquery && 
table.getRowTimestampColPos() == -1;
             }
@@ -1039,12 +1042,15 @@ public class UpsertCompiler {
             byte[] txState = table.isTransactional() ?
                     connection.getMutationState().encodeTransaction() : 
ByteUtil.EMPTY_BYTE_ARRAY;
 
+            ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+            if (aggPlan.getTableRef().getTable().isTransactional() 
+                    || (table.getType() == PTableType.INDEX && 
table.isTransactional())) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
             if (ptr.getLength() > 0) {
                 byte[] uuidValue = ServerCacheClient.generateId();
                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
             }
             ResultIterator iterator = aggPlan.iterator();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
new file mode 100644
index 0000000..70658fb
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+
+
+public class OmidGCProcessor extends DelegateRegionObserver {
+
+    public OmidGCProcessor() {
+        super(new BaseRegionObserver());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
new file mode 100644
index 0000000..fc246d4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+
+
+public class OmidTransactionalProcessor extends DelegateRegionObserver {
+
+    public OmidTransactionalProcessor() {
+        super(new BaseRegionObserver());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
index aa406fd..0448972 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.ObserverContextImpl;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@@ -59,11 +60,7 @@ import org.apache.hadoop.hbase.security.access.AuthResult;
 import org.apache.hadoop.hbase.security.access.Permission;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
 import org.apache.hadoop.hbase.security.access.UserPermission;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.Bytes;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
 import 
org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -72,6 +69,10 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.MetaDataUtil;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
 
 public class PhoenixAccessController extends BaseMetaDataEndpointObserver {
 
@@ -464,7 +465,7 @@ public class PhoenixAccessController extends 
BaseMetaDataEndpointObserver {
 
             private void callGetUserPermissionsRequest(final 
List<UserPermission> userPermissions, AccessControlService.Interface service
                     , AccessControlProtos.GetUserPermissionsRequest request, 
RpcController controller) {
-                
((AccessControlService.Interface)service).getUserPermissions(controller, 
request,
+                service.getUserPermissions(controller, request,
                     new 
RpcCallback<AccessControlProtos.GetUserPermissionsResponse>() {
                         @Override
                         public void 
run(AccessControlProtos.GetUserPermissionsResponse message) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index e4fc149..d94ce7f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -136,6 +136,8 @@ import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -442,6 +444,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         byte[] replayMutations = 
scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES);
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        PhoenixTransactionProvider txnProvider = null;
+        if (txState != null) {
+            int clientVersion = clientVersionBytes == null ? 
ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+            txnProvider = TransactionFactory.getTransactionProvider(txState, 
clientVersion);
+        }
         List<Expression> selectExpressions = null;
         byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
         boolean isUpsert = false;
@@ -550,7 +558,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 useIndexProto = false;
             }
     
-            byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
             if(needToWrite) {
                 synchronized (lock) {
                     if (isRegionClosingOrSplitting) {
@@ -673,6 +680,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                         valueGetter, ptr, 
results.get(0).getTimestamp(),
                                         
env.getRegion().getRegionInfo().getStartKey(),
                                         
env.getRegion().getRegionInfo().getEndKey());
+
+                                    if (txnProvider != null) {
+                                        put = 
txnProvider.markPutAsCommitted(put, ts, ts);
+                                    }
                                     indexMutations.add(put);
                                 }
                             }
@@ -740,6 +751,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             for (Mutation mutation : row.toRowMutations()) {
                                 if (replayMutations != null) {
                                     mutation.setAttribute(REPLAY_WRITES, 
replayMutations);
+                                } else if (txnProvider != null && 
projectedTable.getType() == PTableType.INDEX) {
+                                    mutation = 
txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
                                 }
                                 mutations.add(mutation);
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d6a70f2..c0a81ec 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -297,7 +297,12 @@ public enum SQLExceptionCode {
     UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: 
"),
     CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction 
if transactions are disabled."),
     CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: 
"),
-    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table 
from non transactional to transactional for "),
+    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table 
from non transactional to transactional for"),
+    UNSUPPORTED_COLUMN_ENCODING_FOR_TXN_PROVIDER(1094, "44A25", "Column 
encoding is not supported for"),
+    UNSUPPORTED_STORAGE_FORMAT_FOR_TXN_PROVIDER(1095, "44A26", "Only 
ONE_CELL_PER_COLUMN storage scheme is supported for"),
+    CANNOT_SWITCH_TXN_PROVIDERS(1096, "44A27", "Cannot switch transaction 
providers."),
+    TTL_UNSUPPORTED_FOR_TXN_TABLE(10947, "44A28", "TTL is not supported for"),
+    CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE(10948, "44A29", "Local indexes 
cannot be created for"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index cf17b90..4a75e52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.TraceScope;
-import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -299,7 +297,8 @@ public class MutationState implements SQLCloseable {
     public Table getHTable(PTable table) throws SQLException {
         Table htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional() && 
phoenixTransactionContext.isTransactionRunning()) {
-            htable = phoenixTransactionContext.getTransactionalTable(htable, 
table.isImmutableRows());
+            // We're only using this table for reading, so we want it wrapped 
even if it's an index
+            htable = phoenixTransactionContext.getTransactionalTable(htable, 
table.isImmutableRows() || table.getType() == PTableType.INDEX);
         }
         return htable;
     }
@@ -564,8 +563,7 @@ public class MutationState implements SQLCloseable {
                         MultiRowMutationState multiRowMutationState = 
mutations.remove(key);
                         if (multiRowMutationState != null) {
                             final List<Mutation> deleteMutations = 
Lists.newArrayList();
-                            generateMutations(tableRef, mutationTimestamp, 
serverTimestamp, multiRowMutationState,
-                                    deleteMutations, null);
+                            generateMutations(key, mutationTimestamp, 
serverTimestamp, multiRowMutationState, deleteMutations, null);
                             if (indexMutations == null) {
                                 indexMutations = deleteMutations;
                             } else {
@@ -968,7 +966,12 @@ public class MutationState implements SQLCloseable {
                                 
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
                                 phoenixTransactionContext.markDMLFence(table);
                             }
-                            hTable = 
phoenixTransactionContext.getTransactionalTableWriter(connection, table, 
hTable, !tableInfo.isDataTable());
+                            // Only pass true for last argument if the index 
is being written to on it's own (i.e. initial
+                            // index population), not if it's being written to 
for normal maintenance due to writes to
+                            // the data table. This case is different because 
the initial index population does not need
+                            // to be done transactionally since the index is 
only made active after all writes have
+                            // occurred successfully.
+                            hTable = 
phoenixTransactionContext.getTransactionalTableWriter(connection, table, 
hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
                         }
                         numMutations = mutationList.size();
                         GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index c209b1f..de13605 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -60,6 +61,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -78,14 +80,26 @@ import com.google.common.primitives.Longs;
 public class PhoenixTxIndexMutationGenerator {
     private final PhoenixIndexCodec codec;
     private final PhoenixIndexMetaData indexMetaData;
+    private final ConnectionQueryServices services;
+    private final byte[] regionStartKey;
+    private final byte[] regionEndKey;
+    private final byte[] tableName;
 
-    public PhoenixTxIndexMutationGenerator(Configuration conf, 
PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, 
byte[] regionEndKey) {
+    private PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, 
Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, 
byte[] regionStartKey, byte[] regionEndKey) {
+        this.services = services;
         this.indexMetaData = indexMetaData;
-        this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, 
tableName);
+        this.regionStartKey = regionStartKey;
+        this.regionEndKey = regionEndKey;
+        this.tableName = tableName;
+        this.codec = new PhoenixIndexCodec(conf, tableName);
     }
 
-    public PhoenixTxIndexMutationGenerator(Configuration conf, 
PhoenixIndexMetaData indexMetaData, byte[] tableName) {
-        this(conf, indexMetaData, tableName, null, null);
+    public PhoenixTxIndexMutationGenerator(Configuration conf, 
PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, 
byte[] regionEndKey) {
+        this(null, conf, indexMetaData, tableName, regionStartKey, 
regionEndKey);
+    }
+
+    public PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, 
PhoenixIndexMetaData indexMetaData, byte[] tableName) {
+        this(services, services.getConfiguration(), indexMetaData, tableName, 
null, null);
     }
 
     private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> 
mutations, ImmutableBytesPtr row, Mutation m) {
@@ -178,7 +192,7 @@ public class PhoenixTxIndexMutationGenerator {
             scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
             ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, null, true, 
-1);
             scanRanges.initializeScan(scan);
-            Table txTable = 
indexMetaData.getTransactionContext().getTransactionalTable(htable, 
isImmutable);
+            Table txTable = 
indexMetaData.getTransactionContext().getTransactionalTable(htable, true);
             // For rollback, we need to see all versions, including
             // the last committed version as there may be multiple
             // checkpointed versions.
@@ -312,7 +326,18 @@ public class PhoenixTxIndexMutationGenerator {
     private void generateDeletes(PhoenixIndexMetaData indexMetaData,
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             byte[] attribValue, TxTableState state) throws IOException {
-        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
+        byte[] regionStartKey = this.regionStartKey;
+        byte[] regionEndKey = this.regionEndKey;
+        if (services != null && indexMetaData.hasLocalIndexes()) {
+            try {
+                HRegionLocation tableRegionLocation = 
services.getTableRegionLocation(tableName, state.getCurrentRowKey());
+                regionStartKey = 
tableRegionLocation.getRegionInfo().getStartKey();
+                regionEndKey = tableRegionLocation.getRegionInfo().getEndKey();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData, regionStartKey, regionEndKey);
         for (IndexUpdate delete : deletes) {
             if (delete.isValid()) {
                 
delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY,
 attribValue);
@@ -327,7 +352,18 @@ public class PhoenixTxIndexMutationGenerator {
             TxTableState state)
             throws IOException {
         state.applyMutation();
-        Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
+        byte[] regionStartKey = this.regionStartKey;
+        byte[] regionEndKey = this.regionEndKey;
+        if (services != null && indexMetaData.hasLocalIndexes()) {
+            try {
+                HRegionLocation tableRegionLocation = 
services.getTableRegionLocation(tableName, state.getCurrentRowKey());
+                regionStartKey = 
tableRegionLocation.getRegionInfo().getStartKey();
+                regionEndKey = tableRegionLocation.getRegionInfo().getEndKey();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+        Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData, regionStartKey, regionEndKey);
         boolean validPut = false;
         for (IndexUpdate put : puts) {
             if (put.isValid()) {
@@ -475,8 +511,8 @@ public class PhoenixTxIndexMutationGenerator {
         };
         try {
             PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(indexMetaDataCache, attributes);
-            return new 
PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(),
 indexMetaData,
-                    table.getPhysicalName().getBytes());
+            return new 
PhoenixTxIndexMutationGenerator(connection.getQueryServices(),connection.getQueryServices().getConfiguration(),
 indexMetaData,
+                    table.getPhysicalName().getBytes(), null, null);
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 968bdab..0b7d9ef 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -58,7 +58,7 @@ public abstract class BaseIndexBuilder implements 
IndexBuilder {
             Constructor<? extends IndexCodec> meth = 
codecClass.getDeclaredConstructor(new Class[0]);
             meth.setAccessible(true);
             this.codec = meth.newInstance();
-            this.codec.initialize(conf, env.getRegionInfo().getStartKey(), 
env.getRegionInfo().getEndKey(), 
env.getRegion().getRegionInfo().getTable().getName());
+            this.codec.initialize(conf, 
env.getRegion().getRegionInfo().getTable().getName());
         } catch (Exception e) {
             throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index 7dde941..0b70de3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -39,10 +39,12 @@ public interface IndexCodec {
      *            the current state of the table that needs to be cleaned up. 
Generally, you only care about the latest
      *            column values, for each column you are indexing for each 
index table.
      * @param context TODO
+     * @param regionStartKey TODO
+     * @param regionEndKey TODO
      * @return the pairs of (deletes, index table name) that should be applied.
      * @throws IOException
      */
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) throws IOException;
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException;
 
     // table state has the pending update already applied, before calling
     // get the new index entries
@@ -59,10 +61,12 @@ public interface IndexCodec {
      *            the current state of the table that needs to an index update 
Generally, you only care about the latest
      *            column values, for each column you are indexing for each 
index table.
      * @param context TODO
+     * @param regionStartKey TODO
+     * @param regionEndKey TODO
      * @return the pairs of (updates,index table name) that should be applied.
      * @throws IOException
      */
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) throws IOException;
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException;
 
     /**
      * This allows the codec to dynamically change whether or not indexing 
should take place for a table. If it doesn't
@@ -80,5 +84,5 @@ public interface IndexCodec {
      */
     public boolean isEnabled(Mutation m) throws IOException;
 
-    public void initialize(Configuration conf, byte[] startKey, byte[] endKey, 
byte[] tableName);
+    public void initialize(Configuration conf, byte[] tableName);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 59a8a2b..54d7f87 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -125,7 +125,7 @@ public class LocalTableState implements TableState {
      * state for any of the columns you are indexing.
      * <p>
      * <i>NOTE:</i> This method should <b>not</b> be used during
-     * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the 
pending update will not yet have been
+     * {@link IndexCodec#getIndexDeletes(TableState, BatchState, byte[], 
byte[])} as the pending update will not yet have been
      * applied - you are merely attempting to cleanup the current state and 
therefore do <i>not</i>
      * need to track the indexed columns.
      * <p>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 60f86d4..1e4da87 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -183,7 +183,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             throws IOException {
 
         // get the index updates for this current batch
-        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, 
indexMetaData);
+        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, 
indexMetaData, env.getRegionInfo().getStartKey(), 
env.getRegionInfo().getEndKey());
         state.resetTrackedColumns();
 
         /*
@@ -221,7 +221,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
     }
 
     /**
-     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
+     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState, IndexMetaData, byte[], byte[])} and then 
add them to the
      * update map.
      * <p>
      * Expects the {@link LocalTableState} to already be correctly setup 
(correct timestamp, updates applied, etc).
@@ -231,7 +231,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      */
     protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, 
LocalTableState state, long ts, IndexMetaData indexMetaData)
             throws IOException {
-        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, 
indexMetaData);
+        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, 
indexMetaData, env.getRegionInfo().getStartKey(), 
env.getRegionInfo().getEndKey());
         if (cleanup != null) {
             for (IndexUpdate d : cleanup) {
                 if (!d.isValid()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 3416b21..ffbf98f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -101,7 +101,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -184,9 +184,24 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, 
PhoenixConnection connection) {
         List<PTable> indexes = dataTable.getIndexes();
-        serialize(dataTable, ptr, indexes, connection);
+        serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection);
     }
 
+    public static void serializeServerMaintainedIndexes(PTable dataTable, 
ImmutableBytesWritable ptr,
+            List<PTable> indexes, PhoenixConnection connection) {
+        Iterator<PTable> indexesItr = Collections.emptyListIterator();
+        boolean onlyLocalIndexes = dataTable.isImmutableRows() || 
dataTable.isTransactional();
+        if (onlyLocalIndexes) {
+            if (!dataTable.isTransactional()
+                    || 
!dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER))
 {
+                indexesItr = maintainedLocalIndexes(indexes.iterator());
+            }
+        } else {
+            indexesItr = maintainedIndexes(indexes.iterator());
+        }
+    
+        serialize(dataTable, ptr, Lists.newArrayList(indexesItr), connection);
+    }
     /**
      * For client-side to serialize all IndexMaintainers for a given table
      * @param dataTable data table
@@ -195,22 +210,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
             List<PTable> indexes, PhoenixConnection connection) {
-        Iterator<PTable> indexesItr;
-        boolean onlyLocalIndexes = dataTable.isImmutableRows() || 
dataTable.isTransactional();
-        if (onlyLocalIndexes) {
-            indexesItr = maintainedLocalIndexes(indexes.iterator());
-        } else {
-            indexesItr = maintainedIndexes(indexes.iterator());
-        }
-        if (!indexesItr.hasNext()) {
+        if (indexes.isEmpty()) {
             ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             return;
         }
-        int nIndexes = 0;
-        while (indexesItr.hasNext()) {
-            nIndexes++;
-            indexesItr.next();
-        }
+        int nIndexes = indexes.size();
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(stream);
         try {
@@ -218,11 +222,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             WritableUtils.writeVInt(output, nIndexes * 
(dataTable.getBucketNum() == null ? 1 : -1));
             // Write out data row key schema once, since it's the same for all 
index maintainers
             dataTable.getRowKeySchema().write(output);
-            indexesItr = onlyLocalIndexes 
-                        ? maintainedLocalIndexes(indexes.iterator())
-                        : maintainedIndexes(indexes.iterator());
-            while (indexesItr.hasNext()) {
-                    
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, 
connection));
+            for (PTable index : indexes) {
+                    
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(index.getIndexMaintainer(dataTable, 
connection));
                     byte[] protoBytes = proto.toByteArray();
                     WritableUtils.writeVInt(output, protoBytes.length);
                     output.write(protoBytes);
@@ -285,31 +286,33 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
 
     private static List<IndexMaintainer> deserialize(byte[] buf, int offset, 
int length, boolean useProtoForIndexMaintainer) {
-        ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, 
length);
-        DataInput input = new DataInputStream(stream);
         List<IndexMaintainer> maintainers = Collections.emptyList();
-        try {
-            int size = WritableUtils.readVInt(input);
-            boolean isDataTableSalted = size < 0;
-            size = Math.abs(size);
-            RowKeySchema rowKeySchema = new RowKeySchema();
-            rowKeySchema.readFields(input);
-            maintainers = Lists.newArrayListWithExpectedSize(size);
-            for (int i = 0; i < size; i++) {
-                if (useProtoForIndexMaintainer) {
-                  int protoSize = WritableUtils.readVInt(input);
-                  byte[] b = new byte[protoSize];
-                  input.readFully(b);
-                  
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
-                  maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
-                } else {
-                    IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
-                    maintainer.readFields(input);
-                    maintainers.add(maintainer);
+        if (length > 0) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(buf, 
offset, length);
+            DataInput input = new DataInputStream(stream);
+            try {
+                int size = WritableUtils.readVInt(input);
+                boolean isDataTableSalted = size < 0;
+                size = Math.abs(size);
+                RowKeySchema rowKeySchema = new RowKeySchema();
+                rowKeySchema.readFields(input);
+                maintainers = Lists.newArrayListWithExpectedSize(size);
+                for (int i = 0; i < size; i++) {
+                    if (useProtoForIndexMaintainer) {
+                      int protoSize = WritableUtils.readVInt(input);
+                      byte[] b = new byte[protoSize];
+                      input.readFully(b);
+                      
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+                      maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
+                    } else {
+                        IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
+                        maintainer.readFields(input);
+                        maintainers.add(maintainer);
+                    }
                 }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
             }
-        } catch (IOException e) {
-            throw new RuntimeException(e); // Impossible
         }
         return maintainers;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 04dcbc4..bb50fa1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -36,8 +36,8 @@ import com.google.common.collect.Sets;
 
 /**
  * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup 
an index (
- * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the 
new index state should be (
- * {@link #getIndexUpserts(TableState, IndexMetaData)}).
+ * {@link #getIndexDeletes(TableState, IndexMetaData, byte[], byte[])}) as 
well as what the new index state should be (
+ * {@link #getIndexUpserts(TableState, IndexMetaData, byte[], byte[])}).
  */
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
@@ -46,23 +46,19 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MAINTAINERS = "IndexMaintainers";
     public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
     
-    private byte[] regionStartKey;
-    private byte[] regionEndKey;
     private byte[] tableName;
     
     public PhoenixIndexCodec() {
         
     }
 
-    public PhoenixIndexCodec(Configuration conf, byte[] regionStartKey, byte[] 
regionEndKey, byte[] tableName) {
-        initialize(conf, regionStartKey, regionEndKey, tableName);
+    public PhoenixIndexCodec(Configuration conf, byte[] tableName) {
+        initialize(conf, tableName);
     }
     
 
     @Override
-    public void initialize(Configuration conf, byte[] regionStartKey, byte[] 
regionEndKey, byte[] tableName) {
-        this.regionStartKey = regionStartKey;
-        this.regionEndKey = regionEndKey;
+    public void initialize(Configuration conf, byte[] tableName) {
         this.tableName = tableName;
     }
 
@@ -74,7 +70,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) throws IOException {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
         List<IndexMaintainer> indexMaintainers = 
metaData.getIndexMaintainers();
         if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
@@ -97,7 +93,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) throws IOException {
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
         List<IndexMaintainer> indexMaintainers = 
metaData.getIndexMaintainers();
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 2ddd659..73737b4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -100,7 +100,7 @@ public class PhoenixTransactionalIndexer implements 
RegionObserver, RegionCoproc
     public void start(CoprocessorEnvironment e) throws IOException {
         final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment)e;
         String serverName = env.getServerName().getServerName();
-        codec = new PhoenixIndexCodec(env.getConfiguration(), 
env.getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey(), 
env.getRegionInfo().getTable().getName());
+        codec = new PhoenixIndexCodec(env.getConfiguration(), 
env.getRegionInfo().getTable().getName());
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
         // setup the actual index writer
         // For transactional tables, we keep the index active upon a write 
failure

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index e2ea759..f02e9d3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -90,7 +90,10 @@ public class ScanningResultIterator implements 
ResultIterator {
     private void getScanMetrics() {
 
         if (!scanMetricsUpdated && scanMetricsEnabled) {
-            ScanMetrics scanMetrics = scanner.getScanMetrics();
+            ScanMetrics scanMetrics = scan.getScanMetrics();
+            if (scanMetrics == null) {
+                return;
+            }
             Map<String, Long> scanMetricsMap = scanMetrics.getMetricsMap();
             if(scanMetricsMap == null) {
                 return;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 9599b1f..267aa1d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -51,7 +51,6 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
@@ -193,7 +192,7 @@ public class TableResultIterator implements ResultIterator {
                         if (retry <= 0) {
                             throw e1;
                         }
-                        Long cacheId = ((HashJoinCacheNotFoundException) 
e1).getCacheId();
+                        Long cacheId = e1.getCacheId();
                         retry--;
                         try {
                             ServerCache cache = caches == null ? null :

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 11a8176..6100b20 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -26,11 +26,17 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * Writes mutations directly to HBase using HBase front-door APIs.
  */
@@ -65,7 +71,24 @@ public class DirectHTableWriter {
 
     public void write(List<Mutation> mutations) throws IOException, 
InterruptedException {
         Object[] results = new Object[mutations.size()];
-        table.batch(mutations, results);
+        String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+        if (txnIdStr == null) {
+            table.batch(mutations, results);
+        } else {
+            long ts = Long.parseLong(txnIdStr);
+            PhoenixTransactionProvider provider = 
TransactionFactory.Provider.getDefault().getTransactionProvider();
+            String txnProviderStr = 
conf.get(PhoenixConfigurationUtil.TX_PROVIDER);
+            if (txnProviderStr != null) {
+                provider = 
TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider();
+            }
+            List<Mutation> shadowedMutations = 
Lists.newArrayListWithExpectedSize(mutations.size());
+            for (Mutation m : mutations) {
+                if (m instanceof Put) {
+                    shadowedMutations.add(provider.markPutAsCommitted((Put)m, 
ts, ts));
+                }
+            }
+            table.batch(shadowedMutations, results);
+        }
     }
 
     protected Configuration getConf() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 94d5e73..c7e7b88 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -383,6 +383,7 @@ public class IndexTool extends Configured implements Tool {
             if (pdataTable.isTransactional()) {
                 configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
                     
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+                configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, 
pdataTable.getTransactionProvider().name());
             }
             configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
                 Long.toString(maxTimeRange));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 6f469e6..8318adf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -21,28 +21,37 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixJobCounters;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * Mapper that hands over rows from data table to the index table.
  *
@@ -95,6 +104,18 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
        
         context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
         
+        PhoenixTransactionProvider provider = null;
+        Configuration conf = context.getConfiguration();
+        long ts = HConstants.LATEST_TIMESTAMP;
+        String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+        if (txnIdStr != null) {
+            ts = Long.parseLong(txnIdStr);
+            provider = 
TransactionFactory.Provider.getDefault().getTransactionProvider();
+            String txnProviderStr = 
conf.get(PhoenixConfigurationUtil.TX_PROVIDER);
+            if (txnProviderStr != null) {
+                provider = 
TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider();
+            }
+        }
         try {
            final ImmutableBytesWritable outputKey = new 
ImmutableBytesWritable();
            final List<Object> values = record.getValues();
@@ -102,16 +123,30 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
            indxWritable.write(this.pStatement);
            this.pStatement.execute();
             
-           final Iterator<Pair<byte[], List<Cell>>> uncommittedDataIterator = 
PhoenixRuntime.getUncommittedDataIterator(connection, true);
-           while (uncommittedDataIterator.hasNext()) {
-                Pair<byte[], List<Cell>> kvPair = 
uncommittedDataIterator.next();
-                if (Bytes.compareTo(Bytes.toBytes(indexTableName), 
kvPair.getFirst()) != 0) {
+           PhoenixConnection pconn = 
connection.unwrap(PhoenixConnection.class);
+           final Iterator<Pair<byte[],List<Mutation>>> iterator = 
pconn.getMutationState().toMutations(true);
+           while (iterator.hasNext()) {
+                Pair<byte[], List<Mutation>> pair = iterator.next();
+                if (Bytes.compareTo(Bytes.toBytes(indexTableName), 
pair.getFirst()) != 0) {
                     // skip edits for other tables
                     continue;
                 }
-                List<Cell> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);
-                for (Cell kv : keyValueList) {
+                List<Cell> keyValues = 
Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // 
Guess-timate 5 key values per row
+                for (Mutation mutation : pair.getSecond()) {
+                    if (mutation instanceof Put) {
+                        if (provider != null) {
+                            mutation = 
provider.markPutAsCommitted((Put)mutation, ts, ts);
+                        }
+                        for (List<Cell> cellList : 
mutation.getFamilyCellMap().values()) {
+                            List<Cell>keyValueList = 
preUpdateProcessor.preUpsert(mutation.getRow(), cellList);
+                            for (Cell keyValue : keyValueList) {
+                                keyValues.add(keyValue);
+                            }
+                        }
+                    }
+                }
+                Collections.sort(keyValues, 
pconn.getKeyValueBuilder().getKeyValueComparator());
+                for (Cell kv : keyValues) {
                     outputKey.set(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength());
                     context.write(outputKey, 
PhoenixKeyValueUtil.maybeCopyCell(kv));
                 }
@@ -137,4 +172,4 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
             }
          }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 909c85c..35f82c1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -105,6 +105,8 @@ public final class PhoenixConfigurationUtil {
     
     public static final String TX_SCN_VALUE = "phoenix.mr.txscn.value";
     
+    public static final String TX_PROVIDER = "phoenix.mr.txprovider";
+
     /** Configuration key for the class name of an 
ImportPreUpsertKeyValueProcessor */
     public static final String UPSERT_HOOK_CLASS_CONFKEY = 
"phoenix.mapreduce.import.kvprocessor";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 5394d05..942e77f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -169,7 +169,7 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
 
     public QueryLoggerDisruptor getQueryDisruptor();
     
-    public PhoenixTransactionClient 
initTransactionClient(TransactionFactory.Provider provider);
+    public PhoenixTransactionClient 
initTransactionClient(TransactionFactory.Provider provider) throws SQLException;
     
     /**
      * Writes a cell to SYSTEM.MUTEX using checkAndPut to ensure only a single 
client can execute a
@@ -184,4 +184,4 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
      */
     public void deleteMutexCell(String tenantId, String schemaName, String 
tableName,
             String columnName, String familyName) throws SQLException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f1e3d82..08f7310 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -152,7 +152,6 @@ import 
org.apache.phoenix.coprocessor.MetaDataRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
-import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
@@ -242,6 +241,7 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.transaction.PhoenixTransactionClient;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ByteUtil;
@@ -868,10 +868,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             
if(!newDesc.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 
builder.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 
priority, null);
             }
-            // For ALTER TABLE
-            boolean nonTxToTx = 
Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
-            boolean isTransactional =
-                    
Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || 
nonTxToTx;
+            TransactionFactory.Provider provider = 
getTransactionProvider(tableProps);
+            boolean isTransactional = (provider != null);
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing 
coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table 
because we use
@@ -934,22 +932,27 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
 
             if (isTransactional) {
-                TransactionFactory.Provider provider = 
(TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
-                if (provider == null) {
-                    String providerValue = 
this.props.get(QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, 
QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER);
-                    provider = 
(TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(providerValue);
-                }
                 Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
                 if (!newDesc.hasCoprocessor(coprocessorClass.getName())) {
                     builder.addCoprocessor(coprocessorClass.getName(), null, 
priority - 10, null);
                 }
+                Class<? extends RegionObserver> coprocessorGCClass = 
provider.getTransactionProvider().getGCCoprocessor();
+                if (coprocessorGCClass != null) {
+                    if (!newDesc.hasCoprocessor(coprocessorGCClass.getName())) 
{
+                        builder.addCoprocessor(coprocessorGCClass.getName(), 
null, priority - 10, null);
+                    }
+                }
             } else {
                 // Remove all potential transactional coprocessors
-                for (TransactionFactory.Provider provider : 
TransactionFactory.Provider.values()) {
-                    Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
+                for (TransactionFactory.Provider aprovider : 
TransactionFactory.Provider.values()) {
+                    Class<? extends RegionObserver> coprocessorClass = 
aprovider.getTransactionProvider().getCoprocessor();
+                    Class<? extends RegionObserver> coprocessorGCClass = 
aprovider.getTransactionProvider().getGCCoprocessor();
                     if (coprocessorClass != null && 
newDesc.hasCoprocessor(coprocessorClass.getName())) {
                         builder.removeCoprocessor(coprocessorClass.getName());
                     }
+                    if (coprocessorGCClass != null && 
newDesc.hasCoprocessor(coprocessorGCClass.getName())) {
+                        
builder.removeCoprocessor(coprocessorGCClass.getName());
+                    }
                 }
             }
         } catch (IOException e) {
@@ -957,6 +960,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    private TransactionFactory.Provider 
getTransactionProvider(Map<String,Object> tableProps) {
+        TransactionFactory.Provider provider = 
(TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
+        return provider;
+    }
+    
     private static interface RetriableOperation {
         boolean checkForCompletion() throws TimeoutException, IOException;
         String getOperationName();
@@ -1177,15 +1185,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 if (!modifyExistingMetaData) {
                     return existingDesc; // Caller already knows that no 
metadata was changed
                 }
-                boolean willBeTx = 
Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name()));
+                TransactionFactory.Provider provider = 
getTransactionProvider(props);
+                boolean willBeTx = provider != null;
                 // If mapping an existing table as transactional, set property 
so that existing
                 // data is correctly read.
                 if (willBeTx) {
-                    
newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.TRUE.toString());
+                    if (!equalTxCoprocessor(provider, existingDesc, 
newDesc.build())) {
+                        // Cannot switch between different providers
+                        if (hasTxCoprocessor(existingDesc)) {
+                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SWITCH_TXN_PROVIDERS)
+                            
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
+                            
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
+                        }
+                        if 
(provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX))
 {
+                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL)
+                            .setMessage(provider.name())
+                            
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
+                            
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
+                        }
+                        
newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.TRUE.toString());
+                    }
                 } else {
                     // If we think we're creating a non transactional table 
when it's already
                     // transactional, don't allow.
-                    if 
(existingDesc.hasCoprocessor(TephraTransactionalProcessor.class.getName())) {
+                    if (hasTxCoprocessor(existingDesc)) {
                         throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
                         
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
                         
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
@@ -1219,6 +1242,21 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return null; // will never make it here
     }
+    
+    private static boolean hasTxCoprocessor(TableDescriptor descriptor) {
+        for (TransactionFactory.Provider provider : 
TransactionFactory.Provider.values()) {
+            Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
+            if (coprocessorClass != null && 
descriptor.hasCoprocessor(coprocessorClass.getName())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean equalTxCoprocessor(TransactionFactory.Provider 
provider, TableDescriptor existingDesc, TableDescriptor newDesc) {
+        Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
+        return (coprocessorClass != null && 
existingDesc.hasCoprocessor(coprocessorClass.getName()) && 
newDesc.hasCoprocessor(coprocessorClass.getName()));
+}
 
     private void modifyTable(byte[] tableName, TableDescriptor newDesc, 
boolean shouldPoll) throws IOException,
     InterruptedException, TimeoutException, SQLException {
@@ -1951,6 +1989,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } else {
                 indexTableProps = Maps.newHashMapWithExpectedSize(1);
                 
indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.valueOf(txValue));
+                
indexTableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, 
tableProps.get(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
             }
             for (PTable index : table.getIndexes()) {
                 TableDescriptor indexDesc = 
admin.getDescriptor(TableName.valueOf(index.getPhysicalName().getBytes()));
@@ -2073,6 +2112,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         boolean willBeTransactional = false;
         boolean isOrWillBeTransactional = isTransactional;
         Integer newTTL = null;
+        TransactionFactory.Provider txProvider = null;
         for (String family : properties.keySet()) {
             List<Pair<String, Object>> propsList = properties.get(family);
             if (propsList != null && propsList.size() > 0) {
@@ -2083,20 +2123,27 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     if ((MetaDataUtil.isHTableProperty(propName) ||  
TableProperty.isPhoenixTableProperty(propName)) && addingColumns) {
                         // setting HTable and PhoenixTable properties while 
adding a column is not allowed.
                         throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN)
-                        .setMessage("Property: " + propName).build()
+                        .setMessage("Property: " + propName)
+                        .setSchemaName(table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        .build()
                         .buildException();
                     }
                     if (MetaDataUtil.isHTableProperty(propName)) {
                         // Can't have a column family name for a property 
that's an HTableProperty
                         if 
(!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
                             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY)
-                            .setMessage("Column Family: " + family + ", 
Property: " + propName).build()
+                            .setMessage("Column Family: " + family + ", 
Property: " + propName)
+                            .setSchemaName(table.getSchemaName().getString())
+                            .setTableName(table.getTableName().getString())
+                            .build()
                             .buildException();
                         }
                         tableProps.put(propName, propValue);
                     } else {
                         if (TableProperty.isPhoenixTableProperty(propName)) {
-                            TableProperty.valueOf(propName).validate(true, 
!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
+                            TableProperty tableProp = 
TableProperty.valueOf(propName);
+                            tableProp.validate(true, 
!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
                             if (propName.equals(TTL)) {
                                 newTTL = ((Number)prop.getSecond()).intValue();
                                 // Even though TTL is really a HColumnProperty 
we treat it specially.
@@ -2105,6 +2152,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && 
Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional 
= true;
                                 
tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue);
+                            } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER) && propValue != 
null) {
+                                willBeTransactional = isOrWillBeTransactional 
= true;
+                                
tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE);
+                                txProvider = 
(Provider)TableProperty.TRANSACTION_PROVIDER.getValue(propValue);
+                                
tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, txProvider);
                             }
                         } else {
                             if (MetaDataUtil.isHColumnProperty(propName)) {
@@ -2118,12 +2170,26 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 // FIXME: This isn't getting triggered as 
currently a property gets evaluated
                                 // as HTableProp if its neither HColumnProp or 
PhoenixTableProp.
                                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY)
-                                .setMessage("Column Family: " + family + ", 
Property: " + propName).build()
+                                .setMessage("Column Family: " + family + ", 
Property: " + propName)
+                                
.setSchemaName(table.getSchemaName().getString())
+                                .setTableName(table.getTableName().getString())
+                                .build()
                                 .buildException();
                             }
                         }
                     }
                 }
+                if (isOrWillBeTransactional && newTTL != null) {
+                    TransactionFactory.Provider isOrWillBeTransactionProvider 
= txProvider == null ? table.getTransactionProvider() : txProvider;
+                    if 
(isOrWillBeTransactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL))
 {
+                        throw new 
SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
+                        .setMessage(isOrWillBeTransactionProvider.name())
+                        .setSchemaName(table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        .build()
+                        .buildException();
+                    }
+                }
                 if (!colFamilyPropsMap.isEmpty()) {
                     stmtFamiliesPropsMap.put(family, colFamilyPropsMap);
                 }
@@ -4761,7 +4827,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     }
 
     @Override
-    public synchronized PhoenixTransactionClient 
initTransactionClient(Provider provider) {
+    public synchronized PhoenixTransactionClient 
initTransactionClient(Provider provider) throws SQLException {
         PhoenixTransactionClient client = txClients[provider.ordinal()];
         if (client == null) {
             client = txClients[provider.ordinal()] = 
provider.getTransactionProvider().getTransactionClient(config, connectionInfo);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 57ff6e8..57af9e0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -368,7 +368,7 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     }
     
     @Override
-    public PhoenixTransactionClient initTransactionClient(Provider provider) {
+    public PhoenixTransactionClient initTransactionClient(Provider provider) 
throws SQLException {
         return getDelegate().initTransactionClient(provider);
     }
 

Reply via email to