http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 4f08846..61be561 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -109,7 +109,6 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -541,7 +540,11 @@ public class UpsertCompiler {
                 // Disable running upsert select on server side if a table has 
global mutable secondary indexes on it
                 boolean hasGlobalMutableIndexes = 
SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows();
                 boolean hasWhereSubquery = select.getWhere() != null && 
select.getWhere().hasSubquery();
-                runOnServer = (sameTable || (serverUpsertSelectEnabled && 
!hasGlobalMutableIndexes)) && isAutoCommit && !table.isTransactional()
+                runOnServer = (sameTable || (serverUpsertSelectEnabled && 
!hasGlobalMutableIndexes)) && isAutoCommit 
+                        // We can run the upsert select for initial index 
population on server side for transactional
+                        // tables since the writes do not need to be done 
transactionally, since we gate the index
+                        // usage on successfully writing all data rows.
+                        && (!table.isTransactional() || table.getType() == 
PTableType.INDEX)
                         && !(table.isImmutableRows() && 
!table.getIndexes().isEmpty())
                         && !select.isJoin() && !hasWhereSubquery && 
table.getRowTimestampColPos() == -1;
             }
@@ -1039,12 +1042,15 @@ public class UpsertCompiler {
             byte[] txState = table.isTransactional() ?
                     connection.getMutationState().encodeTransaction() : 
ByteUtil.EMPTY_BYTE_ARRAY;
 
+            ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+            if (aggPlan.getTableRef().getTable().isTransactional() 
+                    || (table.getType() == PTableType.INDEX && 
table.isTransactional())) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
             if (ptr.getLength() > 0) {
                 byte[] uuidValue = ServerCacheClient.generateId();
                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                 scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
-                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
             }
             ResultIterator iterator = aggPlan.iterator();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
new file mode 100644
index 0000000..d90a31e
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+
+
+public class OmidGCProcessor extends DelegateRegionObserver {
+
+    public OmidGCProcessor() {
+        super(new BaseRegionObserver() {
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
new file mode 100644
index 0000000..f977cfe
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+
+
+public class OmidTransactionalProcessor extends DelegateRegionObserver {
+
+    public OmidTransactionalProcessor() {
+        super(new BaseRegionObserver() {
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c325d70..a667316 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -130,6 +130,8 @@ import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -429,6 +431,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         byte[] replayMutations = 
scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES);
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        PhoenixTransactionProvider txnProvider = null;
+        if (txState != null) {
+            int clientVersion = clientVersionBytes == null ? 
ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+            txnProvider = TransactionFactory.getTransactionProvider(txState, 
clientVersion);
+        }
         List<Expression> selectExpressions = null;
         byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
         boolean isUpsert = false;
@@ -535,7 +543,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 useIndexProto = false;
             }
     
-            byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
             if(needToWrite) {
                 synchronized (lock) {
                     if (isRegionClosingOrSplitting) {
@@ -658,6 +665,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                         valueGetter, ptr, 
results.get(0).getTimestamp(),
                                         
env.getRegion().getRegionInfo().getStartKey(),
                                         
env.getRegion().getRegionInfo().getEndKey());
+
+                                    if (txnProvider != null) {
+                                        put = 
txnProvider.markPutAsCommitted(put, ts, ts);
+                                    }
                                     indexMutations.add(put);
                                 }
                             }
@@ -725,6 +736,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             for (Mutation mutation : row.toRowMutations()) {
                                 if (replayMutations != null) {
                                     mutation.setAttribute(REPLAY_WRITES, 
replayMutations);
+                                } else if (txnProvider != null && 
projectedTable.getType() == PTableType.INDEX) {
+                                    mutation = 
txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
                                 }
                                 mutations.add(mutation);
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index d6a70f2..c0a81ec 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -297,7 +297,12 @@ public enum SQLExceptionCode {
     UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: 
"),
     CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction 
if transactions are disabled."),
     CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: 
"),
-    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table 
from non transactional to transactional for "),
+    CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table 
from non transactional to transactional for"),
+    UNSUPPORTED_COLUMN_ENCODING_FOR_TXN_PROVIDER(1094, "44A25", "Column 
encoding is not supported for"),
+    UNSUPPORTED_STORAGE_FORMAT_FOR_TXN_PROVIDER(1095, "44A26", "Only 
ONE_CELL_PER_COLUMN storage scheme is supported for"),
+    CANNOT_SWITCH_TXN_PROVIDERS(1096, "44A27", "Cannot switch transaction 
providers."),
+    TTL_UNSUPPORTED_FOR_TXN_TABLE(10947, "44A28", "TTL is not supported for"),
+    CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE(10948, "44A29", "Local indexes 
cannot be created for"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
index 6b3f9ca..075df4d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -46,19 +46,14 @@ import com.google.protobuf.Message;
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
 
-public class DelegateHTable implements HTableInterface {
-    protected final HTableInterface delegate;
+public class DelegateHTable implements Table {
+    protected final Table delegate;
 
-    public DelegateHTable(HTableInterface delegate) {
+    public DelegateHTable(Table delegate) {
         this.delegate = delegate;
     }
 
     @Override
-    public byte[] getTableName() {
-        return delegate.getTableName();
-    }
-
-    @Override
     public TableName getName() {
         return delegate.getName();
     }
@@ -79,11 +74,6 @@ public class DelegateHTable implements HTableInterface {
     }
 
     @Override
-    public Boolean[] exists(List<Get> gets) throws IOException {
-        return delegate.exists(gets);
-    }
-
-    @Override
     public void batch(List<? extends Row> actions, Object[] results) throws 
IOException, InterruptedException {
         delegate.batch(actions, results);
     }
@@ -117,12 +107,6 @@ public class DelegateHTable implements HTableInterface {
         return delegate.get(gets);
     }
 
-    @SuppressWarnings("deprecation")
-    @Override
-    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException 
{
-        return delegate.getRowOrBefore(row, family);
-    }
-
     @Override
     public ResultScanner getScanner(Scan scan) throws IOException {
         return delegate.getScanner(scan);
@@ -195,23 +179,6 @@ public class DelegateHTable implements HTableInterface {
         return delegate.incrementColumnValue(row, family, qualifier, amount, 
durability);
     }
 
-    @SuppressWarnings("deprecation")
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount, boolean writeToWAL)
-            throws IOException {
-        return delegate.incrementColumnValue(row, family, qualifier, amount, 
writeToWAL);
-    }
-
-    @Override
-    public boolean isAutoFlush() {
-        return delegate.isAutoFlush();
-    }
-
-    @Override
-    public void flushCommits() throws IOException {
-        delegate.flushCommits();
-    }
-
     @Override
     public void close() throws IOException {
         delegate.close();
@@ -234,22 +201,6 @@ public class DelegateHTable implements HTableInterface {
         delegate.coprocessorService(service, startKey, endKey, callable, 
callback);
     }
 
-    @SuppressWarnings("deprecation")
-    @Override
-    public void setAutoFlush(boolean autoFlush) {
-        delegate.setAutoFlush(autoFlush);
-    }
-
-    @Override
-    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-        delegate.setAutoFlush(autoFlush, clearBufferOnFail);
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean autoFlush) {
-        delegate.setAutoFlushTo(autoFlush);
-    }
-
     @Override
     public long getWriteBufferSize() {
         return delegate.getWriteBufferSize();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d2d1eea..14f13b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -39,15 +39,13 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.TraceScope;
-import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -294,10 +292,11 @@ public class MutationState implements SQLCloseable {
     // the Transaction outside of MutationState, this seems reasonable, as the 
member variables
     // would not change as these threads are running. We also clone 
mutationState to ensure that
     // the transaction context won't change due to a commit when auto commit 
is true.
-    public HTableInterface getHTable(PTable table) throws SQLException {
-        HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
+    public Table getHTable(PTable table) throws SQLException {
+        Table htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
         if (table.isTransactional() && 
phoenixTransactionContext.isTransactionRunning()) {
-            htable = phoenixTransactionContext.getTransactionalTable(htable, 
table.isImmutableRows());
+            // We're only using this table for reading, so we want it wrapped 
even if it's an index
+            htable = phoenixTransactionContext.getTransactionalTable(htable, 
table.isImmutableRows() || table.getType() == PTableType.INDEX);
         }
         return htable;
     }
@@ -533,7 +532,7 @@ public class MutationState implements SQLCloseable {
                             if (indexMutationsMap == null) {
                                 PhoenixTxIndexMutationGenerator generator = 
PhoenixTxIndexMutationGenerator.newGenerator(connection, table,
                                         indexList, 
mutationsPertainingToIndex.get(0).getAttributesMap());
-                                try (HTableInterface htable = 
connection.getQueryServices().getTable(
+                                try (Table htable = 
connection.getQueryServices().getTable(
                                         table.getPhysicalName().getBytes())) {
                                     Collection<Pair<Mutation, byte[]>> 
allMutations = generator.getIndexUpdates(htable,
                                             
mutationsPertainingToIndex.iterator());
@@ -562,8 +561,7 @@ public class MutationState implements SQLCloseable {
                         MultiRowMutationState multiRowMutationState = 
mutations.remove(key);
                         if (multiRowMutationState != null) {
                             final List<Mutation> deleteMutations = 
Lists.newArrayList();
-                            generateMutations(tableRef, mutationTimestamp, 
serverTimestamp, multiRowMutationState,
-                                    deleteMutations, null);
+                            generateMutations(key, mutationTimestamp, 
serverTimestamp, multiRowMutationState, deleteMutations, null);
                             if (indexMutations == null) {
                                 indexMutations = deleteMutations;
                             } else {
@@ -960,7 +958,7 @@ public class MutationState implements SQLCloseable {
                     // region servers.
                     shouldRetry = cache != null;
                     SQLException sqlE = null;
-                    HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
+                    Table hTable = 
connection.getQueryServices().getTable(htableName);
                     try {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted 
data
@@ -968,7 +966,12 @@ public class MutationState implements SQLCloseable {
                                 
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
                                 phoenixTransactionContext.markDMLFence(table);
                             }
-                            hTable = 
phoenixTransactionContext.getTransactionalTableWriter(connection, table, 
hTable, !tableInfo.isDataTable());
+                            // Only pass true for last argument if the index 
is being written to on it's own (i.e. initial
+                            // index population), not if it's being written to 
for normal maintenance due to writes to
+                            // the data table. This case is different because 
the initial index population does not need
+                            // to be done transactionally since the index is 
only made active after all writes have
+                            // occurred successfully.
+                            hTable = 
phoenixTransactionContext.getTransactionalTableWriter(connection, table, 
hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
                         }
                         numMutations = mutationList.size();
                         GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
@@ -981,7 +984,7 @@ public class MutationState implements SQLCloseable {
                         for (final List<Mutation> mutationBatch : 
mutationBatchList) {
                             if (shouldRetryIndexedMutation) {
                                 // if there was an index write failure, retry 
the mutation in a loop
-                                final HTableInterface finalHTable = hTable;
+                                final Table finalHTable = hTable;
                                 
PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
                                     @Override
                                     public void doMutation() throws 
IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 877c939..8a94314 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -34,9 +34,9 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -62,6 +62,7 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PVarbinary;
@@ -79,14 +80,26 @@ import com.google.common.primitives.Longs;
 public class PhoenixTxIndexMutationGenerator {
     private final PhoenixIndexCodec codec;
     private final PhoenixIndexMetaData indexMetaData;
+    private final ConnectionQueryServices services;
+    private final byte[] regionStartKey;
+    private final byte[] regionEndKey;
+    private final byte[] tableName;
 
-    public PhoenixTxIndexMutationGenerator(Configuration conf, 
PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, 
byte[] regionEndKey) {
+    private PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, 
Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, 
byte[] regionStartKey, byte[] regionEndKey) {
+        this.services = services;
         this.indexMetaData = indexMetaData;
-        this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, 
tableName);
+        this.regionStartKey = regionStartKey;
+        this.regionEndKey = regionEndKey;
+        this.tableName = tableName;
+        this.codec = new PhoenixIndexCodec(conf, tableName);
     }
 
-    public PhoenixTxIndexMutationGenerator(Configuration conf, 
PhoenixIndexMetaData indexMetaData, byte[] tableName) {
-        this(conf, indexMetaData, tableName, null, null);
+    public PhoenixTxIndexMutationGenerator(Configuration conf, 
PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, 
byte[] regionEndKey) {
+        this(null, conf, indexMetaData, tableName, regionStartKey, 
regionEndKey);
+    }
+
+    public PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, 
PhoenixIndexMetaData indexMetaData, byte[] tableName) {
+        this(services, services.getConfiguration(), indexMetaData, tableName, 
null, null);
     }
 
     private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> 
mutations, ImmutableBytesPtr row, Mutation m) {
@@ -99,7 +112,7 @@ public class PhoenixTxIndexMutationGenerator {
         stored.addAll(m);
     }
 
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface 
htable, Iterator<? extends Mutation> mutationIterator) throws IOException, 
SQLException {
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdates(Table htable, 
Iterator<? extends Mutation> mutationIterator) throws IOException, SQLException 
{
 
         if (!mutationIterator.hasNext()) {
             return Collections.emptyList();
@@ -179,7 +192,7 @@ public class PhoenixTxIndexMutationGenerator {
             scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
emptyKeyValueQualifier);
             ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, null, true, 
-1);
             scanRanges.initializeScan(scan);
-            Table txTable = 
indexMetaData.getTransactionContext().getTransactionalTable(htable, 
isImmutable);
+            Table txTable = 
indexMetaData.getTransactionContext().getTransactionalTable(htable, true);
             // For rollback, we need to see all versions, including
             // the last committed version as there may be multiple
             // checkpointed versions.
@@ -313,7 +326,18 @@ public class PhoenixTxIndexMutationGenerator {
     private void generateDeletes(PhoenixIndexMetaData indexMetaData,
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             byte[] attribValue, TxTableState state) throws IOException {
-        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
+        byte[] regionStartKey = this.regionStartKey;
+        byte[] regionEndKey = this.regionEndKey;
+        if (services != null && indexMetaData.hasLocalIndexes()) {
+            try {
+                HRegionLocation tableRegionLocation = 
services.getTableRegionLocation(tableName, state.getCurrentRowKey());
+                regionStartKey = 
tableRegionLocation.getRegionInfo().getStartKey();
+                regionEndKey = tableRegionLocation.getRegionInfo().getEndKey();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData, regionStartKey, regionEndKey);
         for (IndexUpdate delete : deletes) {
             if (delete.isValid()) {
                 
delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY,
 attribValue);
@@ -328,7 +352,18 @@ public class PhoenixTxIndexMutationGenerator {
             TxTableState state)
             throws IOException {
         state.applyMutation();
-        Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
+        byte[] regionStartKey = this.regionStartKey;
+        byte[] regionEndKey = this.regionEndKey;
+        if (services != null && indexMetaData.hasLocalIndexes()) {
+            try {
+                HRegionLocation tableRegionLocation = 
services.getTableRegionLocation(tableName, state.getCurrentRowKey());
+                regionStartKey = 
tableRegionLocation.getRegionInfo().getStartKey();
+                regionEndKey = tableRegionLocation.getRegionInfo().getEndKey();
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+        Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData, regionStartKey, regionEndKey);
         boolean validPut = false;
         for (IndexUpdate put : puts) {
             if (put.isValid()) {
@@ -476,8 +511,8 @@ public class PhoenixTxIndexMutationGenerator {
         };
         try {
             PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(indexMetaDataCache, attributes);
-            return new 
PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(),
 indexMetaData,
-                    table.getPhysicalName().getBytes());
+            return new 
PhoenixTxIndexMutationGenerator(connection.getQueryServices(),connection.getQueryServices().getConfiguration(),
 indexMetaData,
+                    table.getPhysicalName().getBytes(), null, null);
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f13e97a..0ff83ca 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -58,7 +58,7 @@ public abstract class BaseIndexBuilder implements 
IndexBuilder {
             Constructor<? extends IndexCodec> meth = 
codecClass.getDeclaredConstructor(new Class[0]);
             meth.setAccessible(true);
             this.codec = meth.newInstance();
-            this.codec.initialize(conf, env.getRegionInfo().getStartKey(), 
env.getRegionInfo().getEndKey(), 
env.getRegion().getRegionInfo().getTable().getName());
+            this.codec.initialize(conf, 
env.getRegion().getRegionInfo().getTable().getName());
         } catch (Exception e) {
             throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index 7dde941..0b70de3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -39,10 +39,12 @@ public interface IndexCodec {
      *            the current state of the table that needs to be cleaned up. 
Generally, you only care about the latest
      *            column values, for each column you are indexing for each 
index table.
      * @param context TODO
+     * @param regionStartKey TODO
+     * @param regionEndKey TODO
      * @return the pairs of (deletes, index table name) that should be applied.
      * @throws IOException
      */
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) throws IOException;
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException;
 
     // table state has the pending update already applied, before calling
     // get the new index entries
@@ -59,10 +61,12 @@ public interface IndexCodec {
      *            the current state of the table that needs to an index update 
Generally, you only care about the latest
      *            column values, for each column you are indexing for each 
index table.
      * @param context TODO
+     * @param regionStartKey TODO
+     * @param regionEndKey TODO
      * @return the pairs of (updates,index table name) that should be applied.
      * @throws IOException
      */
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) throws IOException;
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException;
 
     /**
      * This allows the codec to dynamically change whether or not indexing 
should take place for a table. If it doesn't
@@ -80,5 +84,5 @@ public interface IndexCodec {
      */
     public boolean isEnabled(Mutation m) throws IOException;
 
-    public void initialize(Configuration conf, byte[] startKey, byte[] endKey, 
byte[] tableName);
+    public void initialize(Configuration conf, byte[] tableName);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 4f65416..8a069f8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -136,7 +136,7 @@ public class LocalTableState implements TableState {
      * state for any of the columns you are indexing.
      * <p>
      * <i>NOTE:</i> This method should <b>not</b> be used during
-     * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the 
pending update will not yet have been
+     * {@link IndexCodec#getIndexDeletes(TableState, BatchState, byte[], 
byte[])} as the pending update will not yet have been
      * applied - you are merely attempting to cleanup the current state and 
therefore do <i>not</i>
      * need to track the indexed columns.
      * <p>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 97ac30d..820a475 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -186,7 +186,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             throws IOException {
 
         // get the index updates for this current batch
-        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, 
indexMetaData);
+        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, 
indexMetaData, env.getRegionInfo().getStartKey(), 
env.getRegionInfo().getEndKey());
         state.resetTrackedColumns();
 
         /*
@@ -224,7 +224,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
     }
 
     /**
-     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
+     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState, IndexMetaData, byte[], byte[])} and then 
add them to the
      * update map.
      * <p>
      * Expects the {@link LocalTableState} to already be correctly setup 
(correct timestamp, updates applied, etc).
@@ -234,7 +234,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      */
     protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, 
LocalTableState state, long ts, IndexMetaData indexMetaData)
             throws IOException {
-        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, 
indexMetaData);
+        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, 
indexMetaData, env.getRegionInfo().getStartKey(), 
env.getRegionInfo().getEndKey());
         if (cleanup != null) {
             for (IndexUpdate d : cleanup) {
                 if (!d.isValid()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 140c304..d1da5f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -101,7 +101,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -184,9 +184,24 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, 
PhoenixConnection connection) {
         List<PTable> indexes = dataTable.getIndexes();
-        serialize(dataTable, ptr, indexes, connection);
+        serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection);
     }
 
+    public static void serializeServerMaintainedIndexes(PTable dataTable, 
ImmutableBytesWritable ptr,
+            List<PTable> indexes, PhoenixConnection connection) {
+        Iterator<PTable> indexesItr = Collections.emptyListIterator();
+        boolean onlyLocalIndexes = dataTable.isImmutableRows() || 
dataTable.isTransactional();
+        if (onlyLocalIndexes) {
+            if (!dataTable.isTransactional()
+                    || 
!dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER))
 {
+                indexesItr = maintainedLocalIndexes(indexes.iterator());
+            }
+        } else {
+            indexesItr = maintainedIndexes(indexes.iterator());
+        }
+    
+        serialize(dataTable, ptr, Lists.newArrayList(indexesItr), connection);
+    }
     /**
      * For client-side to serialize all IndexMaintainers for a given table
      * @param dataTable data table
@@ -195,22 +210,11 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
      */
     public static void serialize(PTable dataTable, ImmutableBytesWritable ptr,
             List<PTable> indexes, PhoenixConnection connection) {
-        Iterator<PTable> indexesItr;
-        boolean onlyLocalIndexes = dataTable.isImmutableRows() || 
dataTable.isTransactional();
-        if (onlyLocalIndexes) {
-            indexesItr = maintainedLocalIndexes(indexes.iterator());
-        } else {
-            indexesItr = maintainedIndexes(indexes.iterator());
-        }
-        if (!indexesItr.hasNext()) {
+        if (indexes.isEmpty()) {
             ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
             return;
         }
-        int nIndexes = 0;
-        while (indexesItr.hasNext()) {
-            nIndexes++;
-            indexesItr.next();
-        }
+        int nIndexes = indexes.size();
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         DataOutputStream output = new DataOutputStream(stream);
         try {
@@ -218,11 +222,8 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             WritableUtils.writeVInt(output, nIndexes * 
(dataTable.getBucketNum() == null ? 1 : -1));
             // Write out data row key schema once, since it's the same for all 
index maintainers
             dataTable.getRowKeySchema().write(output);
-            indexesItr = onlyLocalIndexes 
-                        ? maintainedLocalIndexes(indexes.iterator())
-                        : maintainedIndexes(indexes.iterator());
-            while (indexesItr.hasNext()) {
-                    
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, 
connection));
+            for (PTable index : indexes) {
+                    
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = IndexMaintainer.toProto(index.getIndexMaintainer(dataTable, 
connection));
                     byte[] protoBytes = proto.toByteArray();
                     WritableUtils.writeVInt(output, protoBytes.length);
                     output.write(protoBytes);
@@ -285,31 +286,33 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
     }
 
     private static List<IndexMaintainer> deserialize(byte[] buf, int offset, 
int length, boolean useProtoForIndexMaintainer) {
-        ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, 
length);
-        DataInput input = new DataInputStream(stream);
         List<IndexMaintainer> maintainers = Collections.emptyList();
-        try {
-            int size = WritableUtils.readVInt(input);
-            boolean isDataTableSalted = size < 0;
-            size = Math.abs(size);
-            RowKeySchema rowKeySchema = new RowKeySchema();
-            rowKeySchema.readFields(input);
-            maintainers = Lists.newArrayListWithExpectedSize(size);
-            for (int i = 0; i < size; i++) {
-                if (useProtoForIndexMaintainer) {
-                  int protoSize = WritableUtils.readVInt(input);
-                  byte[] b = new byte[protoSize];
-                  input.readFully(b);
-                  
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
-                  maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
-                } else {
-                    IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
-                    maintainer.readFields(input);
-                    maintainers.add(maintainer);
+        if (length > 0) {
+            ByteArrayInputStream stream = new ByteArrayInputStream(buf, 
offset, length);
+            DataInput input = new DataInputStream(stream);
+            try {
+                int size = WritableUtils.readVInt(input);
+                boolean isDataTableSalted = size < 0;
+                size = Math.abs(size);
+                RowKeySchema rowKeySchema = new RowKeySchema();
+                rowKeySchema.readFields(input);
+                maintainers = Lists.newArrayListWithExpectedSize(size);
+                for (int i = 0; i < size; i++) {
+                    if (useProtoForIndexMaintainer) {
+                      int protoSize = WritableUtils.readVInt(input);
+                      byte[] b = new byte[protoSize];
+                      input.readFully(b);
+                      
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer 
proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+                      maintainers.add(IndexMaintainer.fromProto(proto, 
rowKeySchema, isDataTableSalted));
+                    } else {
+                        IndexMaintainer maintainer = new 
IndexMaintainer(rowKeySchema, isDataTableSalted);
+                        maintainer.readFields(input);
+                        maintainers.add(maintainer);
+                    }
                 }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
             }
-        } catch (IOException e) {
-            throw new RuntimeException(e); // Impossible
         }
         return maintainers;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index d33e3fe..eb911e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -36,8 +36,8 @@ import com.google.common.collect.Sets;
 
 /**
  * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup 
an index (
- * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the 
new index state should be (
- * {@link #getIndexUpserts(TableState, IndexMetaData)}).
+ * {@link #getIndexDeletes(TableState, IndexMetaData, byte[], byte[])}) as 
well as what the new index state should be (
+ * {@link #getIndexUpserts(TableState, IndexMetaData, byte[], byte[])}).
  */
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
@@ -46,23 +46,19 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MAINTAINERS = "IndexMaintainers";
     public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
     
-    private byte[] regionStartKey;
-    private byte[] regionEndKey;
     private byte[] tableName;
     
     public PhoenixIndexCodec() {
         
     }
 
-    public PhoenixIndexCodec(Configuration conf, byte[] regionStartKey, byte[] 
regionEndKey, byte[] tableName) {
-        initialize(conf, regionStartKey, regionEndKey, tableName);
+    public PhoenixIndexCodec(Configuration conf, byte[] tableName) {
+        initialize(conf, tableName);
     }
     
 
     @Override
-    public void initialize(Configuration conf, byte[] regionStartKey, byte[] 
regionEndKey, byte[] tableName) {
-        this.regionStartKey = regionStartKey;
-        this.regionEndKey = regionEndKey;
+    public void initialize(Configuration conf, byte[] tableName) {
         this.tableName = tableName;
     }
 
@@ -74,7 +70,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) throws IOException {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
         List<IndexMaintainer> indexMaintainers = 
metaData.getIndexMaintainers();
         if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
@@ -97,7 +93,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) throws IOException {
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
         PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
         List<IndexMaintainer> indexMaintainers = 
metaData.getIndexMaintainers();
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 189102d..5041c75 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -90,7 +90,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment)e;
         Configuration conf = e.getConfiguration();
         String serverName = 
env.getRegionServerServices().getServerName().getServerName();
-        codec = new PhoenixIndexCodec(conf, 
env.getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey(), 
env.getRegionInfo().getTable().getName());
+        codec = new PhoenixIndexCodec(conf, 
env.getRegionInfo().getTable().getName());
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new 
DelegateRegionCoprocessorEnvironment(env, 
ConnectionType.INDEX_WRITER_CONNECTION);
         // setup the actual index writer
         // For transactional tables, we keep the index active upon a write 
failure

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 9a31238..f02e9d3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.tuple.ResultTuple;
@@ -89,7 +90,11 @@ public class ScanningResultIterator implements 
ResultIterator {
     private void getScanMetrics() {
 
         if (!scanMetricsUpdated && scanMetricsEnabled) {
-            Map<String, Long> scanMetricsMap = 
scan.getScanMetrics().getMetricsMap();
+            ScanMetrics scanMetrics = scan.getScanMetrics();
+            if (scanMetrics == null) {
+                return;
+            }
+            Map<String, Long> scanMetricsMap = scanMetrics.getMetricsMap();
             if(scanMetricsMap == null) {
                 return;
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index e6b94fb..cc99ae4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -37,8 +37,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.concurrent.GuardedBy;
 
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -51,7 +51,6 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
@@ -74,7 +73,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 public class TableResultIterator implements ResultIterator {
     private final Scan scan;
-    private final HTableInterface htable;
+    private final Table htable;
     private final ScanMetricsHolder scanMetricsHolder;
     private static final ResultIterator UNINITIALIZED_SCANNER = 
ResultIterator.EMPTY_ITERATOR;
     private final long renewLeaseThreshold;
@@ -187,13 +186,13 @@ public class TableResultIterator implements 
ResultIterator {
                                 
newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
                             }
                         }
-                        
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
+                        
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getName().getName());
                         logger.debug(
                                 "Retrying when Hash Join cache is not found on 
the server ,by sending the cache again");
                         if (retry <= 0) {
                             throw e1;
                         }
-                        Long cacheId = ((HashJoinCacheNotFoundException) 
e1).getCacheId();
+                        Long cacheId = e1.getCacheId();
                         retry--;
                         try {
                             ServerCache cache = caches == null ? null :

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
index 59b26b2..1230f25 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java
@@ -24,10 +24,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * Writes mutations directly to HBase using HBase front-door APIs.
  */
@@ -62,7 +68,24 @@ public class DirectHTableWriter {
 
     public void write(List<Mutation> mutations) throws IOException, 
InterruptedException {
         Object[] results = new Object[mutations.size()];
-        table.batch(mutations, results);
+        String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+        if (txnIdStr == null) {
+            table.batch(mutations, results);
+        } else {
+            long ts = Long.parseLong(txnIdStr);
+            PhoenixTransactionProvider provider = 
TransactionFactory.Provider.getDefault().getTransactionProvider();
+            String txnProviderStr = 
conf.get(PhoenixConfigurationUtil.TX_PROVIDER);
+            if (txnProviderStr != null) {
+                provider = 
TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider();
+            }
+            List<Mutation> shadowedMutations = 
Lists.newArrayListWithExpectedSize(mutations.size());
+            for (Mutation m : mutations) {
+                if (m instanceof Put) {
+                    shadowedMutations.add(provider.markPutAsCommitted((Put)m, 
ts, ts));
+                }
+            }
+            table.batch(shadowedMutations, results);
+        }
     }
 
     protected Configuration getConf() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 15d41ea..f2f4cdb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -381,6 +381,7 @@ public class IndexTool extends Configured implements Tool {
             if (pdataTable.isTransactional()) {
                 configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
                     
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+                configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, 
pdataTable.getTransactionProvider().name());
             }
             configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
                 Long.toString(maxTimeRange));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 9e0d629..353de7a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -21,26 +21,37 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixJobCounters;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * Mapper that hands over rows from data table to the index table.
  *
@@ -93,6 +104,18 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
        
         context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
         
+        PhoenixTransactionProvider provider = null;
+        Configuration conf = context.getConfiguration();
+        long ts = HConstants.LATEST_TIMESTAMP;
+        String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+        if (txnIdStr != null) {
+            ts = Long.parseLong(txnIdStr);
+            provider = 
TransactionFactory.Provider.getDefault().getTransactionProvider();
+            String txnProviderStr = 
conf.get(PhoenixConfigurationUtil.TX_PROVIDER);
+            if (txnProviderStr != null) {
+                provider = 
TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider();
+            }
+        }
         try {
            final ImmutableBytesWritable outputKey = new 
ImmutableBytesWritable();
            final List<Object> values = record.getValues();
@@ -100,16 +123,30 @@ public class PhoenixIndexImportMapper extends 
Mapper<NullWritable, PhoenixIndexD
            indxWritable.write(this.pStatement);
            this.pStatement.execute();
             
-           final Iterator<Pair<byte[], List<KeyValue>>> 
uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, 
true);
-           while (uncommittedDataIterator.hasNext()) {
-                Pair<byte[], List<KeyValue>> kvPair = 
uncommittedDataIterator.next();
-                if (Bytes.compareTo(Bytes.toBytes(indexTableName), 
kvPair.getFirst()) != 0) {
+           PhoenixConnection pconn = 
connection.unwrap(PhoenixConnection.class);
+           final Iterator<Pair<byte[],List<Mutation>>> iterator = 
pconn.getMutationState().toMutations(true);
+           while (iterator.hasNext()) {
+                Pair<byte[], List<Mutation>> pair = iterator.next();
+                if (Bytes.compareTo(Bytes.toBytes(indexTableName), 
pair.getFirst()) != 0) {
                     // skip edits for other tables
                     continue;
                 }
-                List<KeyValue> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);
-                for (KeyValue kv : keyValueList) {
+                List<KeyValue> keyValues = 
Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // 
Guess-timate 5 key values per row
+                for (Mutation mutation : pair.getSecond()) {
+                    if (mutation instanceof Put) {
+                        if (provider != null) {
+                            mutation = 
provider.markPutAsCommitted((Put)mutation, ts, ts);
+                        }
+                        for (List<Cell> cellList : 
mutation.getFamilyCellMap().values()) {
+                            List<KeyValue>keyValueList = 
preUpdateProcessor.preUpsert(mutation.getRow(), 
KeyValueUtil.ensureKeyValues(cellList));
+                            for (KeyValue keyValue : keyValueList) {
+                                keyValues.add(keyValue);
+                            }
+                        }
+                    }
+                }
+                Collections.sort(keyValues, 
pconn.getKeyValueBuilder().getKeyValueComparator());
+                for (KeyValue kv : keyValues) {
                     outputKey.set(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength());
                     context.write(outputKey, kv);
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 3b63f66..2f552ea 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -105,6 +105,8 @@ public final class PhoenixConfigurationUtil {
     
     public static final String TX_SCN_VALUE = "phoenix.mr.txscn.value";
     
+    public static final String TX_PROVIDER = "phoenix.mr.txprovider";
+
     /** Configuration key for the class name of an 
ImportPreUpsertKeyValueProcessor */
     public static final String UPSERT_HOOK_CLASS_CONFKEY = 
"phoenix.mapreduce.import.kvprocessor";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 0820232..d5a5199 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -168,7 +168,7 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
 
     public QueryLoggerDisruptor getQueryDisruptor();
     
-    public PhoenixTransactionClient 
initTransactionClient(TransactionFactory.Provider provider);
+    public PhoenixTransactionClient 
initTransactionClient(TransactionFactory.Provider provider) throws SQLException;
     
     /**
      * Writes a cell to SYSTEM.MUTEX using checkAndPut to ensure only a single 
client can execute a
@@ -183,4 +183,4 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
      */
     public void deleteMutexCell(String tenantId, String schemaName, String 
tableName,
             String columnName, String familyName) throws SQLException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dbfd461..ab4678a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -147,7 +147,6 @@ import 
org.apache.phoenix.coprocessor.MetaDataRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
-import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest;
@@ -237,6 +236,7 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.transaction.PhoenixTransactionClient;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ByteUtil;
@@ -860,10 +860,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             if 
(!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 
descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 
priority, null);
             }
-            // For ALTER TABLE
-            boolean nonTxToTx = 
Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
-            boolean isTransactional =
-                    
Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || 
nonTxToTx;
+            TransactionFactory.Provider provider = 
getTransactionProvider(tableProps);
+            boolean isTransactional = (provider != null);
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing 
coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table 
because we use
@@ -926,22 +924,27 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
 
             if (isTransactional) {
-                TransactionFactory.Provider provider = 
(TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
-                if (provider == null) {
-                    String providerValue = 
this.props.get(QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, 
QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER);
-                    provider = 
(TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(providerValue);
-                }
                 Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
                 if (!descriptor.hasCoprocessor(coprocessorClass.getName())) {
                     descriptor.addCoprocessor(coprocessorClass.getName(), 
null, priority - 10, null);
                 }
+                Class<? extends RegionObserver> coprocessorGCClass = 
provider.getTransactionProvider().getGCCoprocessor();
+                if (coprocessorGCClass != null) {
+                    if 
(!descriptor.hasCoprocessor(coprocessorGCClass.getName())) {
+                        
descriptor.addCoprocessor(coprocessorGCClass.getName(), null, priority - 10, 
null);
+                    }
+                }
             } else {
                 // Remove all potential transactional coprocessors
-                for (TransactionFactory.Provider provider : 
TransactionFactory.Provider.values()) {
-                    Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
+                for (TransactionFactory.Provider aprovider : 
TransactionFactory.Provider.values()) {
+                    Class<? extends RegionObserver> coprocessorClass = 
aprovider.getTransactionProvider().getCoprocessor();
+                    Class<? extends RegionObserver> coprocessorGCClass = 
aprovider.getTransactionProvider().getGCCoprocessor();
                     if (coprocessorClass != null && 
descriptor.hasCoprocessor(coprocessorClass.getName())) {
                         
descriptor.removeCoprocessor(coprocessorClass.getName());
                     }
+                    if (coprocessorGCClass != null && 
descriptor.hasCoprocessor(coprocessorGCClass.getName())) {
+                        
descriptor.removeCoprocessor(coprocessorGCClass.getName());
+                    }
                 }
             }
         } catch (IOException e) {
@@ -949,6 +952,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    private TransactionFactory.Provider 
getTransactionProvider(Map<String,Object> tableProps) {
+        TransactionFactory.Provider provider = 
(TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps);
+        return provider;
+    }
+    
     private static interface RetriableOperation {
         boolean checkForCompletion() throws TimeoutException, IOException;
         String getOperationName();
@@ -1171,15 +1179,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 if (!modifyExistingMetaData) {
                     return existingDesc; // Caller already knows that no 
metadata was changed
                 }
-                boolean willBeTx = 
Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name()));
+                TransactionFactory.Provider provider = 
getTransactionProvider(props);
+                boolean willBeTx = provider != null;
                 // If mapping an existing table as transactional, set property 
so that existing
                 // data is correctly read.
                 if (willBeTx) {
-                    
newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.TRUE.toString());
+                    if (!equalTxCoprocessor(provider, existingDesc, newDesc)) {
+                        // Cannot switch between different providers
+                        if (hasTxCoprocessor(existingDesc)) {
+                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SWITCH_TXN_PROVIDERS)
+                            
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
+                            
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
+                        }
+                        if 
(provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX))
 {
+                            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL)
+                            .setMessage(provider.name())
+                            
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
+                            
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
+                        }
+                        
newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.TRUE.toString());
+                    }
                 } else {
                     // If we think we're creating a non transactional table 
when it's already
                     // transactional, don't allow.
-                    if 
(existingDesc.hasCoprocessor(TephraTransactionalProcessor.class.getName())) {
+                    if (hasTxCoprocessor(existingDesc)) {
                         throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
                         
.setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName))
                         
.setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException();
@@ -1212,6 +1235,21 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         return null; // will never make it here
     }
+    
+    private static boolean hasTxCoprocessor(HTableDescriptor descriptor) {
+        for (TransactionFactory.Provider provider : 
TransactionFactory.Provider.values()) {
+            Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
+            if (coprocessorClass != null && 
descriptor.hasCoprocessor(coprocessorClass.getName())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean equalTxCoprocessor(TransactionFactory.Provider 
provider, HTableDescriptor existingDesc, HTableDescriptor newDesc) {
+        Class<? extends RegionObserver> coprocessorClass = 
provider.getTransactionProvider().getCoprocessor();
+        return (coprocessorClass != null && 
existingDesc.hasCoprocessor(coprocessorClass.getName()) && 
newDesc.hasCoprocessor(coprocessorClass.getName()));
+}
 
     private void modifyTable(byte[] tableName, HTableDescriptor newDesc, 
boolean shouldPoll) throws IOException,
     InterruptedException, TimeoutException, SQLException {
@@ -1936,6 +1974,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             } else {
                 indexTableProps = Maps.newHashMapWithExpectedSize(1);
                 
indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, 
Boolean.valueOf(txValue));
+                
indexTableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, 
tableProps.get(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER));
             }
             for (PTable index : table.getIndexes()) {
                 HTableDescriptor indexDescriptor = 
admin.getTableDescriptor(index.getPhysicalName().getBytes());
@@ -2046,6 +2085,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         boolean willBeTransactional = false;
         boolean isOrWillBeTransactional = isTransactional;
         Integer newTTL = null;
+        TransactionFactory.Provider txProvider = null;
         for (String family : properties.keySet()) {
             List<Pair<String, Object>> propsList = properties.get(family);
             if (propsList != null && propsList.size() > 0) {
@@ -2056,20 +2096,27 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     if ((MetaDataUtil.isHTableProperty(propName) ||  
TableProperty.isPhoenixTableProperty(propName)) && addingColumns) {
                         // setting HTable and PhoenixTable properties while 
adding a column is not allowed.
                         throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN)
-                        .setMessage("Property: " + propName).build()
+                        .setMessage("Property: " + propName)
+                        .setSchemaName(table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        .build()
                         .buildException();
                     }
                     if (MetaDataUtil.isHTableProperty(propName)) {
                         // Can't have a column family name for a property 
that's an HTableProperty
                         if 
(!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) {
                             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY)
-                            .setMessage("Column Family: " + family + ", 
Property: " + propName).build()
+                            .setMessage("Column Family: " + family + ", 
Property: " + propName)
+                            .setSchemaName(table.getSchemaName().getString())
+                            .setTableName(table.getTableName().getString())
+                            .build()
                             .buildException();
                         }
                         tableProps.put(propName, propValue);
                     } else {
                         if (TableProperty.isPhoenixTableProperty(propName)) {
-                            TableProperty.valueOf(propName).validate(true, 
!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
+                            TableProperty tableProp = 
TableProperty.valueOf(propName);
+                            tableProp.validate(true, 
!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType());
                             if (propName.equals(TTL)) {
                                 newTTL = ((Number)prop.getSecond()).intValue();
                                 // Even though TTL is really a HColumnProperty 
we treat it specially.
@@ -2078,6 +2125,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && 
Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional 
= true;
                                 
tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue);
+                            } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER) && propValue != 
null) {
+                                willBeTransactional = isOrWillBeTransactional 
= true;
+                                
tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE);
+                                txProvider = 
(Provider)TableProperty.TRANSACTION_PROVIDER.getValue(propValue);
+                                
tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, txProvider);
                             }
                         } else {
                             if (MetaDataUtil.isHColumnProperty(propName)) {
@@ -2091,12 +2143,26 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 // FIXME: This isn't getting triggered as 
currently a property gets evaluated
                                 // as HTableProp if its neither HColumnProp or 
PhoenixTableProp.
                                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY)
-                                .setMessage("Column Family: " + family + ", 
Property: " + propName).build()
+                                .setMessage("Column Family: " + family + ", 
Property: " + propName)
+                                
.setSchemaName(table.getSchemaName().getString())
+                                .setTableName(table.getTableName().getString())
+                                .build()
                                 .buildException();
                             }
                         }
                     }
                 }
+                if (isOrWillBeTransactional && newTTL != null) {
+                    TransactionFactory.Provider isOrWillBeTransactionProvider 
= txProvider == null ? table.getTransactionProvider() : txProvider;
+                    if 
(isOrWillBeTransactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL))
 {
+                        throw new 
SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode())
+                        .setMessage(isOrWillBeTransactionProvider.name())
+                        .setSchemaName(table.getSchemaName().getString())
+                        .setTableName(table.getTableName().getString())
+                        .build()
+                        .buildException();
+                    }
+                }
                 if (!colFamilyPropsMap.isEmpty()) {
                     stmtFamiliesPropsMap.put(family, colFamilyPropsMap);
                 }
@@ -4728,7 +4794,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     }
 
     @Override
-    public synchronized PhoenixTransactionClient 
initTransactionClient(Provider provider) {
+    public synchronized PhoenixTransactionClient 
initTransactionClient(Provider provider) throws SQLException {
         PhoenixTransactionClient client = txClients[provider.ordinal()];
         if (client == null) {
             client = txClients[provider.ordinal()] = 
provider.getTransactionProvider().getTransactionClient(config, connectionInfo);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 147e873..4be4af8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -367,7 +367,7 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     }
     
     @Override
-    public PhoenixTransactionClient initTransactionClient(Provider provider) {
+    public PhoenixTransactionClient initTransactionClient(Provider provider) 
throws SQLException {
         return getDelegate().initTransactionClient(provider);
     }
 

Reply via email to