http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 79da59f,d4ef1cf..e76f55f
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@@ -27,59 -29,51 +29,71 @@@ import java.util.Collections
  import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import co.cask.tephra.Transaction;
 +import co.cask.tephra.Transaction.VisibilityLevel;
 +import co.cask.tephra.TransactionAware;
 +import co.cask.tephra.TransactionCodec;
 +import co.cask.tephra.TransactionContext;
 +import co.cask.tephra.TransactionFailureException;
 +import co.cask.tephra.TransactionNotInProgressException;
 +import co.cask.tephra.TransactionSystemClient;
 +import co.cask.tephra.hbase98.TransactionAwareHTable;
  
+ import javax.annotation.Nonnull;
+ import javax.annotation.concurrent.Immutable;
+ 
  import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.client.Delete;
  import org.apache.hadoop.hbase.client.HTableInterface;
  import org.apache.hadoop.hbase.client.Mutation;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.hadoop.hbase.util.Bytes;
  import org.apache.hadoop.hbase.util.Pair;
+ import org.apache.htrace.Span;
+ import org.apache.htrace.TraceScope;
  import org.apache.phoenix.cache.ServerCacheClient;
  import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 +import org.apache.phoenix.compile.MutationPlan;
 +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
  import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
  import org.apache.phoenix.exception.SQLExceptionCode;
 +import org.apache.phoenix.exception.SQLExceptionInfo;
  import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
  import org.apache.phoenix.index.IndexMaintainer;
  import org.apache.phoenix.index.IndexMetaDataCacheClient;
  import org.apache.phoenix.index.PhoenixIndexCodec;
  import org.apache.phoenix.jdbc.PhoenixConnection;
 +import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
- import org.apache.phoenix.monitoring.PhoenixMetrics;
+ import org.apache.phoenix.monitoring.GlobalClientMetrics;
+ import org.apache.phoenix.monitoring.MutationMetricQueue;
+ import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric;
+ import 
org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue;
+ import org.apache.phoenix.monitoring.ReadMetricQueue;
  import org.apache.phoenix.query.QueryConstants;
  import org.apache.phoenix.schema.IllegalDataException;
  import org.apache.phoenix.schema.MetaDataClient;
  import org.apache.phoenix.schema.PColumn;
+ import org.apache.phoenix.schema.PName;
  import org.apache.phoenix.schema.PRow;
  import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.PTable.IndexType;
  import org.apache.phoenix.schema.PTableType;
+ import org.apache.phoenix.schema.RowKeySchema;
  import org.apache.phoenix.schema.TableRef;
+ import org.apache.phoenix.schema.ValueSchema.Field;
+ import org.apache.phoenix.schema.types.PLong;
  import org.apache.phoenix.trace.util.Tracing;
  import org.apache.phoenix.util.ByteUtil;
  import org.apache.phoenix.util.IndexUtil;
  import org.apache.phoenix.util.LogUtil;
  import org.apache.phoenix.util.PhoenixRuntime;
  import org.apache.phoenix.util.SQLCloseable;
+ import org.apache.phoenix.util.ScanUtil;
  import org.apache.phoenix.util.ServerUtil;
 +import org.apache.phoenix.util.TransactionUtil;
- import org.cloudera.htrace.Span;
- import org.cloudera.htrace.TraceScope;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -98,207 -90,47 +112,220 @@@ import com.google.common.collect.Sets
   */
  public class MutationState implements SQLCloseable {
      private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
 -
 +    private static final TransactionCodec CODEC = new TransactionCodec();
 +    
      private PhoenixConnection connection;
      private final long maxSize;
-     // map from table to rows 
-     //   rows - map from rowkey to columns
-     //      columns - map from column to value
-     private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
 -    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
+     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
 +    private final List<TransactionAware> txAwares;
 +    private final TransactionContext txContext;
 +    private final Set<String> uncommittedPhysicalNames = 
Sets.newHashSetWithExpectedSize(10);
 +    
 +    private Transaction tx;
      private long sizeOffset;
      private int numRows = 0;
 +    private boolean txStarted = false;
 +    
++    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
+     private final MutationMetricQueue mutationMetricQueue;
+     private ReadMetricQueue readMetricQueue;
+     
 -    MutationState(long maxSize, PhoenixConnection connection,
 -            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations) {
 -        this.maxSize = maxSize;
 -        this.connection = connection;
 -        this.mutations = mutations;
 -        boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
 -        this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
 -                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
 -    }
 -
      public MutationState(long maxSize, PhoenixConnection connection) {
-         this(maxSize,connection,null);
 -        this(maxSize,connection,0);
++        this(maxSize,connection, null);
 +    }
 +    
 +    public MutationState(MutationState mutationState) {
-         
this(mutationState.maxSize,mutationState.connection,mutationState.getTransaction());
++        this(mutationState.maxSize, mutationState.connection, 
mutationState.getTransaction());
      }
      
      public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset) {
 -        this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
 +        this(maxSize, connection, null, sizeOffset);
 +    }
 +    
 +    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx) {
 +        this(maxSize,connection, tx, 0);
 +    }
 +    
 +    private MutationState(long maxSize, PhoenixConnection connection, 
Transaction tx, long sizeOffset) {
-         this.maxSize = maxSize;
-         this.connection = connection;
++      this(maxSize, connection, Maps.<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
 tx);
          this.sizeOffset = sizeOffset;
-         this.tx = tx;
-         if (tx == null) {
-           this.txAwares = Collections.emptyList();
-           TransactionSystemClient txServiceClient = 
this.connection.getQueryServices().getTransactionSystemClient();
-           this.txContext = new TransactionContext(txServiceClient);
-         } else {
-             txAwares = Lists.newArrayList();
-             txContext = null;
-         }
      }
      
-     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
-         this.maxSize = maxSize;
-         this.connection = connection;
++      MutationState(long maxSize, PhoenixConnection connection,
++                      Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
mutations,
++                      Transaction tx) {
++              this.maxSize = maxSize;
++              this.connection = connection;
++              this.mutations = mutations;
++              boolean isMetricsEnabled = 
connection.isRequestLevelMetricsEnabled();
++              this.mutationMetricQueue = isMetricsEnabled ? new 
MutationMetricQueue()
++                              : 
NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
++              if (tx == null) {
++                      this.txAwares = Collections.emptyList();
++                      TransactionSystemClient txServiceClient = 
this.connection
++                                      
.getQueryServices().getTransactionSystemClient();
++                      this.txContext = new 
TransactionContext(txServiceClient);
++              } else {
++                      txAwares = Lists.newArrayList();
++                      txContext = null;
++              }
++      }
++
+     public MutationState(TableRef table, 
Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
 -        this(maxSize, connection, sizeOffset);
++        this(maxSize, connection, null, sizeOffset);
          this.mutations.put(table, mutations);
-         this.sizeOffset = sizeOffset;
          this.numRows = mutations.size();
-         this.txAwares = Lists.newArrayList();
-         this.txContext = null;
 +        this.tx = connection.getMutationState().getTransaction();
          throwIfTooBig();
      }
      
 +    public boolean checkpoint(MutationPlan plan) throws SQLException {
 +        Transaction currentTx = getTransaction();
 +        if (currentTx == null || plan.getTargetRef() == null || 
plan.getTargetRef().getTable() == null || 
!plan.getTargetRef().getTable().isTransactional()) {
 +            return false;
 +        }
 +        Set<TableRef> sources = plan.getSourceRefs();
 +        if (sources.isEmpty()) {
 +            return false;
 +        }
 +        // For a DELETE statement, we're always querying the table being 
deleted from. This isn't
 +        // a problem, but it potentially could be if there are other 
references to the same table
 +        // nested in the DELETE statement (as a sub query or join, for 
example).
 +        TableRef ignoreForExcludeCurrent = plan.getOperation() == 
Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null;
 +        boolean excludeCurrent = false;
 +        String targetPhysicalName = 
plan.getTargetRef().getTable().getPhysicalName().getString();
 +        for (TableRef source : sources) {
 +            if (source.getTable().isTransactional() && 
!source.equals(ignoreForExcludeCurrent)) {
 +                String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
 +                if (targetPhysicalName.equals(sourcePhysicalName)) {
 +                    excludeCurrent = true;
 +                    break;
 +                }
 +            }
 +        }
 +        // If we're querying the same table we're updating, we must exclude 
our writes to
 +        // it from being visible.
 +        if (excludeCurrent) {
 +            // If any source tables have uncommitted data prior to last 
checkpoint,
 +            // then we must create a new checkpoint.
 +            boolean hasUncommittedData = false;
 +            for (TableRef source : sources) {
 +                String sourcePhysicalName = 
source.getTable().getPhysicalName().getString();
 +                if (source.getTable().isTransactional() && 
uncommittedPhysicalNames.contains(sourcePhysicalName)) {
 +                    hasUncommittedData = true;
 +                    break;
 +                }
 +            }
 +            if (hasUncommittedData) {
 +                try {
 +                      if (txContext == null) {
 +                              tx = currentTx = 
connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
 +                      }  else {
 +                              txContext.checkpoint();
 +                              tx = currentTx = 
txContext.getCurrentTransaction();
 +                      }
 +                    // Since we've checkpointed, we can clear out uncommitted 
set, since a statement run afterwards
 +                    // should see all this data.
 +                    uncommittedPhysicalNames.clear();
 +                } catch (TransactionFailureException | 
TransactionNotInProgressException e) {
 +                    throw new SQLException(e);
 +                              } 
 +            }
 +            // Since we're querying our own table while mutating it, we must 
exclude
 +            // see our current mutations, otherwise we can get erroneous 
results (for DELETE)
 +            // or get into an infinite loop (for UPSERT SELECT).
 +            currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
 +            return true;
 +        }
 +        return false;
 +    }
 +    
 +    private void addTransactionParticipant(TransactionAware txAware) throws 
SQLException {
 +        if (txContext == null) {
 +            txAwares.add(txAware);
 +            assert(tx != null);
 +            txAware.startTx(tx);
 +        } else {
 +            txContext.addTransactionAware(txAware);
 +        }
 +    }
 +    
 +    // Though MutationState is not thread safe in general, this method should 
be because it may
 +    // be called by TableResultIterator in a multi-threaded manner. Since we 
do not want to expose
 +    // the Transaction outside of MutationState, this seems reasonable, as 
the member variables
 +    // would not change as these threads are running.
 +    public HTableInterface getHTable(PTable table) throws SQLException {
 +        HTableInterface htable = 
this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
 +        Transaction currentTx;
 +        if (table.isTransactional() && (currentTx=getTransaction()) != null) {
 +            TransactionAwareHTable txAware = 
TransactionUtil.getTransactionAwareHTable(htable, table);
 +            // Using cloned mutationState as we may have started a new 
transaction already
 +            // if auto commit is true and we need to use the original one 
here.
 +            txAware.startTx(currentTx);
 +            htable = txAware;
 +        }
 +        return htable;
 +    }
 +    
 +    public PhoenixConnection getConnection() {
 +      return connection;
 +    }
 +    
 +    // Kept private as the Transaction may change when check pointed. Keeping 
it private ensures
 +    // no one holds on to a stale copy.
 +    private Transaction getTransaction() {
 +        return tx != null ? tx : txContext != null ? 
txContext.getCurrentTransaction() : null;
 +    }
 +    
 +    public boolean isTransactionStarted() {
 +      return getTransaction() != null;
 +    }
 +    
 +    public long getReadPointer() {
 +      Transaction tx = getTransaction();
 +      return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getReadPointer();
 +    }
 +    
 +    // For testing
 +    public long getWritePointer() {
 +        Transaction tx = getTransaction();
 +        return tx == null ? HConstants.LATEST_TIMESTAMP : 
tx.getWritePointer();
 +    }
 +    
 +    // For testing
 +    public VisibilityLevel getVisibilityLevel() {
 +        Transaction tx = getTransaction();
 +        return tx == null ? null : tx.getVisibilityLevel();
 +    }
 +    
 +    public boolean startTransaction() throws SQLException {
 +        if (txContext == null) {
 +            throw new SQLException("No transaction context"); // TODO: error 
code
 +        }
 +        
 +              if (connection.getSCN() != null) {
 +                      throw new SQLExceptionInfo.Builder(
 +                                      
SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
 +                                      .build().buildException();
 +              }
 +        
 +        try {
 +            if (!txStarted) {
 +                txContext.start();
 +                txStarted = true;
 +                return true;
 +            }
 +        } catch (TransactionFailureException e) {
 +            throw new SQLException(e); // TODO: error code
 +        }
 +        return false;
 +    }
++
+     public static MutationState emptyMutationState(long maxSize, 
PhoenixConnection connection) {
 -        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap());
++        MutationState state = new MutationState(maxSize, connection, 
Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), 
null);
+         state.sizeOffset = 0;
+         return state;
+     }
      
      private void throwIfTooBig() {
          if (numRows > maxSize) {
@@@ -312,26 -144,18 +339,27 @@@
      }
      
      /**
-      * Combine a newer mutation with this one, where in the event of overlaps,
-      * the newer one will take precedence.
-      * @param newMutation the newer mutation
+      * Combine a newer mutation with this one, where in the event of 
overlaps, the newer one will take precedence.
+      * Combine any metrics collected for the newer mutation.
+      * 
+      * @param newMutationState the newer mutation state
       */
-     public void join(MutationState newMutation) {
-         if (this == newMutation) { // Doesn't make sense
+     public void join(MutationState newMutationState) {
+         if (this == newMutationState) { // Doesn't make sense
              return;
          }
 +        // TODO: what if new and old have txContext as that's really an error
 +        // Really it's an error if newMutation txContext is not null
 +        if (txContext != null) {
 +            for (TransactionAware txAware : txAwares) {
 +                txContext.addTransactionAware(txAware);
 +            }
 +        } else {
-             txAwares.addAll(newMutation.txAwares);
++            txAwares.addAll(newMutationState.txAwares);
 +        }
-         this.sizeOffset += newMutation.sizeOffset;
+         this.sizeOffset += newMutationState.sizeOffset;
          // Merge newMutation with this one, keeping state from newMutation 
for any overlaps
-         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
entry : newMutation.mutations.entrySet()) {
+         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : newMutationState.mutations.entrySet()) {
              // Replace existing entries for the table with new entries
              TableRef tableRef = entry.getKey();
              PTable table = tableRef.getTable();
@@@ -373,14 -202,72 +406,37 @@@
          throwIfTooBig();
      }
      
-     private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, 
final long timestamp, boolean includeMutableIndexes, final boolean sendAll) {
 -    private static ImmutableBytesPtr 
getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable 
table) {
++
++      private static ImmutableBytesPtr 
getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable 
table) {
+         RowKeySchema schema = table.getRowKeySchema();
+         int rowTimestampColPos = table.getRowTimestampColPos();
+         Field rowTimestampField = schema.getField(rowTimestampColPos); 
+         byte[] rowTimestampBytes = PLong.INSTANCE.toBytes(rowTimestamp, 
rowTimestampField.getSortOrder());
+         int oldOffset = ptr.getOffset();
+         int oldLength = ptr.getLength();
+         // Move the pointer to the start byte of the row timestamp pk
+         schema.position(ptr, 0, rowTimestampColPos);
+         byte[] b  = ptr.get();
+         int newOffset = ptr.getOffset();
+         int length = ptr.getLength();
+         for (int i = newOffset; i < newOffset + length; i++) {
+             // modify the underlying bytes array with the bytes of the row 
timestamp
+             b[i] = rowTimestampBytes[i - newOffset];
+         }
+         // move the pointer back to where it was before.
+         ptr.set(ptr.get(), oldOffset, oldLength);
+         return ptr;
+     }
+     
 -    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long 
timestamp, boolean includeMutableIndexes) {
++      private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final 
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final 
long timestamp, boolean includeMutableIndexes, final boolean sendAll) { 
+         final PTable table = tableRef.getTable();
 -        boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != 
-1;
          final Iterator<PTable> indexes = // Only maintain tables with 
immutable rows through this client-side mechanism
-                 (tableRef.getTable().isImmutableRows() || 
includeMutableIndexes) ? 
-                         
IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator())
 : 
+                 (table.isImmutableRows() || includeMutableIndexes) ? 
+                         
IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : 
                          Iterators.<PTable>emptyIterator();
 -        final List<Mutation> mutations = 
Lists.newArrayListWithExpectedSize(values.size());
 +        final List<Mutation> mutationList = 
Lists.newArrayListWithExpectedSize(values.size());
          final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? 
Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
 -        Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = 
values.entrySet().iterator();
 -        long timestampToUse = timestamp;
 -        while (iterator.hasNext()) {
 -            Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = 
iterator.next();
 -            ImmutableBytesPtr key = rowEntry.getKey();
 -            RowMutationState state = rowEntry.getValue();
 -            if (tableWithRowTimestampCol) {
 -                RowTimestampColInfo rowTsColInfo = 
state.getRowTimestampColInfo();
 -                if (rowTsColInfo.useServerTimestamp()) {
 -                    // regenerate the key with this timestamp.
 -                    key = getNewRowKeyWithRowTimestamp(key, timestampToUse, 
table);
 -                } else {
 -                    if (rowTsColInfo.getTimestamp() != null) {
 -                        timestampToUse = rowTsColInfo.getTimestamp();
 -                    }
 -                }
 -            }
 -            PRow row = table.newRow(connection.getKeyValueBuilder(), 
timestampToUse, key);
 -            List<Mutation> rowMutations, rowMutationsPertainingToIndex;
 -            if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) 
{ // means delete
 -                row.delete();
 -                rowMutations = row.toRowMutations();
 -                // Row deletes for index tables are processed by running a 
re-written query
 -                // against the index table (as this allows for flexibility in 
being able to
 -                // delete rows).
 -                rowMutationsPertainingToIndex = Collections.emptyList();
 -            } else {
 -                for (Map.Entry<PColumn,byte[]> valueEntry : 
rowEntry.getValue().getColumnValues().entrySet()) {
 -                    row.setValue(valueEntry.getKey(), valueEntry.getValue());
 -                }
 -                rowMutations = row.toRowMutations();
 -                rowMutationsPertainingToIndex = rowMutations;
 -            }
 -            mutations.addAll(rowMutations);
 -            if (mutationsPertainingToIndex != null) 
mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
 -        }
 +        generateMutations(tableRef, timestamp, values, mutationList, 
mutationsPertainingToIndex);
          return new Iterator<Pair<byte[],List<Mutation>>>() {
              boolean isFirst = true;
  
@@@ -393,24 -280,14 +449,24 @@@
              public Pair<byte[], List<Mutation>> next() {
                  if (isFirst) {
                      isFirst = false;
-                     return new 
Pair<byte[],List<Mutation>>(tableRef.getTable().getPhysicalName().getBytes(),mutationList);
 -                    return new 
Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(),mutations);
++                    return new 
Pair<byte[],List<Mutation>>(table.getPhysicalName().getBytes(), mutationList);
                  }
                  PTable index = indexes.next();
                  List<Mutation> indexMutations;
                  try {
                      indexMutations =
-                             IndexUtil.generateIndexData(tableRef.getTable(), 
index, mutationsPertainingToIndex,
-                                     connection.getKeyValueBuilder(), 
connection);
+                             IndexUtil.generateIndexData(table, index, 
mutationsPertainingToIndex,
 -                                tempPtr, connection.getKeyValueBuilder(), 
connection);
++                                connection.getKeyValueBuilder(), connection);
 +                    // we may also have to include delete mutations for 
immutable tables if we are not processing all the tables in the mutations map
 +                    if (!sendAll) {
 +                          TableRef key = new TableRef(index);
-                                               Map<ImmutableBytesPtr, 
Map<PColumn, byte[]>> rowToColumnMap = mutations.remove(key);
++                                              Map<ImmutableBytesPtr, 
RowMutationState> rowToColumnMap = mutations.remove(key);
 +                          if (rowToColumnMap!=null) {
 +                                  final List<Mutation> deleteMutations = 
Lists.newArrayList();
 +                                  generateMutations(tableRef, timestamp, 
rowToColumnMap, deleteMutations, null);
 +                                  indexMutations.addAll(deleteMutations);
 +                          }
 +                    }
                  } catch (SQLException e) {
                      throw new IllegalDataException(e);
                  }
@@@ -424,54 -301,24 +480,69 @@@
              
          };
      }
 +
 +      private void generateMutations(final TableRef tableRef, long timestamp,
-                       final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> 
values,
++                      final Map<ImmutableBytesPtr, RowMutationState> values,
 +                      final List<Mutation> mutationList,
 +                      final List<Mutation> mutationsPertainingToIndex) {
-               Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
iterator = values.entrySet().iterator();
++              final PTable table = tableRef.getTable();
++        boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != 
-1;
++              Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> 
iterator = values.entrySet().iterator();
++              long timestampToUse = timestamp;
 +        while (iterator.hasNext()) {
-             Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = 
iterator.next();
++            Entry<ImmutableBytesPtr, RowMutationState> rowEntry = 
iterator.next();
 +            ImmutableBytesPtr key = rowEntry.getKey();
-                       PRow row = 
tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
++            RowMutationState state = rowEntry.getValue();
++            if (tableWithRowTimestampCol) {
++                RowTimestampColInfo rowTsColInfo = 
state.getRowTimestampColInfo();
++                if (rowTsColInfo.useServerTimestamp()) {
++                    // regenerate the key with this timestamp.
++                    key = getNewRowKeyWithRowTimestamp(key, timestampToUse, 
table);
++                } else {
++                    if (rowTsColInfo.getTimestamp() != null) {
++                        timestampToUse = rowTsColInfo.getTimestamp();
++                    }
++                }
++            }
++                      PRow row = 
tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestampToUse, 
key);
 +                      List<Mutation> rowMutations, 
rowMutationsPertainingToIndex;
-                       if (rowEntry.getValue() == PRow.DELETE_MARKER) { // 
means delete
++                      if (rowEntry.getValue().getColumnValues() == 
PRow.DELETE_MARKER) { // means delete
 +                          row.delete();
 +                          rowMutations = row.toRowMutations();
 +                          // Row deletes for index tables are processed by 
running a re-written query
 +                          // against the index table (as this allows for 
flexibility in being able to
 +                          // delete rows).
 +                          rowMutationsPertainingToIndex = 
Collections.emptyList();
 +                      } else {
-                           for (Map.Entry<PColumn,byte[]> valueEntry : 
rowEntry.getValue().entrySet()) {
++                              for (Map.Entry<PColumn,byte[]> valueEntry : 
rowEntry.getValue().getColumnValues().entrySet()) {
 +                              row.setValue(valueEntry.getKey(), 
valueEntry.getValue());
 +                          }
 +                          rowMutations = row.toRowMutations();
 +                          rowMutationsPertainingToIndex = rowMutations;
 +                      }
 +                      mutationList.addAll(rowMutations);
 +                      if (mutationsPertainingToIndex != null) 
mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
 +        }
 +      }
      
      /**
       * Get the unsorted list of HBase mutations for the tables with 
uncommitted data.
       * @return list of HBase mutations for uncommitted data.
       */
 -    public Iterator<Pair<byte[],List<Mutation>>> toMutations() {
 -        return toMutations(false);
 +    public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) {
 +        return toMutations(false, timestamp);
      }
      
 -    public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean 
includeMutableIndexes) {
 +    public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean 
includeMutableIndexes, final Long tableTimestamp) {
-         final Iterator<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = 
this.mutations.entrySet().iterator();
+         final Iterator<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>> iterator = 
this.mutations.entrySet().iterator();
          if (!iterator.hasNext()) {
              return Iterators.emptyIterator();
          }
          Long scn = connection.getSCN();
 -        final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : 
scn;
 +        final long timestamp = (tableTimestamp!=null && 
tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null 
? HConstants.LATEST_TIMESTAMP : scn);
 +//        final long timestamp = (scn == null ? HConstants.LATEST_TIMESTAMP : 
scn);
          return new Iterator<Pair<byte[],List<Mutation>>>() {
-             private Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
+             private Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
              private Iterator<Pair<byte[],List<Mutation>>> innerIterator = 
init();
                      
              private Iterator<Pair<byte[],List<Mutation>>> init() {
@@@ -499,353 -346,213 +570,359 @@@
          };
      }
          
--    /**
--     * Validates that the meta data is valid against the server meta data if 
we haven't yet done so.
--     * Otherwise, for every UPSERT VALUES call, we'd need to hit the server 
to see if the meta data
--     * has changed.
--     * @param connection
--     * @return the server time to use for the upsert
--     * @throws SQLException if the table or any columns no longer exist
--     */
-     private long[] validateAll() throws SQLException {
 -    private long[] validate() throws SQLException {
--        int i = 0;
-         long[] timeStamps = new long[this.mutations.size()];
-         for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
entry : mutations.entrySet()) {
-             TableRef tableRef = entry.getKey();
-             timeStamps[i++] = validate(tableRef, entry.getValue());
-         }
-         return timeStamps;
-     }
-     
-     private long validate(TableRef tableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> values) throws SQLException {
--        Long scn = connection.getSCN();
--        MetaDataClient client = new MetaDataClient(connection);
-         long serverTimeStamp = tableRef.getTimeStamp();
-         PTable table = tableRef.getTable();
-         // If we're auto committing, we've already validated the schema when 
we got the ColumnResolver,
-         // so no need to do it again here.
-         if (!connection.getAutoCommit()) {
 -        long[] timeStamps = new long[this.mutations.size()];
 -        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : mutations.entrySet()) {
 -            TableRef tableRef = entry.getKey();
 -            long serverTimeStamp = tableRef.getTimeStamp();
 -            PTable table = tableRef.getTable();
 -            // If we're auto committing, we've already validated the schema 
when we got the ColumnResolver,
 -            // so no need to do it again here.
 -            if (!connection.getAutoCommit()) {
 -                MetaDataMutationResult result = 
client.updateCache(table.getSchemaName().getString(), 
table.getTableName().getString());
 -                long timestamp = result.getMutationTime();
 -                if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
 -                    serverTimeStamp = timestamp;
 -                    if (result.wasUpdated()) {
 -                        // TODO: use bitset?
 -                        table = result.getTable();
 -                        PColumn[] columns = new 
PColumn[table.getColumns().size()];
 -                        for (Map.Entry<ImmutableBytesPtr,RowMutationState> 
rowEntry : entry.getValue().entrySet()) {
 -                              RowMutationState valueEntry = 
rowEntry.getValue();
 -                            if (valueEntry != null) {
 -                              Map<PColumn, byte[]> colValues = 
valueEntry.getColumnValues();
 -                              if (colValues != PRow.DELETE_MARKER) {
 -                                      for (PColumn column : 
colValues.keySet()) {
 -                                          columns[column.getPosition()] = 
column;
 -                                      }
 -                              }
 -                            }
++      /**
++       * Validates that the meta data is valid against the server meta data 
if we haven't yet done so.
++       * Otherwise, for every UPSERT VALUES call, we'd need to hit the server 
to see if the meta data
++       * has changed.
++       * @param connection
++       * @return the server time to use for the upsert
++       * @throws SQLException if the table or any columns no longer exist
++       */
++      private long[] validateAll() throws SQLException {
++          int i = 0;
++          long[] timeStamps = new long[this.mutations.size()];
++          for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : mutations.entrySet()) {
++              TableRef tableRef = entry.getKey();
++              timeStamps[i++] = validate(tableRef, entry.getValue());
++          }
++          return timeStamps;
++      }
++      
++      private long validate(TableRef tableRef, Map<ImmutableBytesPtr, 
RowMutationState> rowKeyToColumnMap) throws SQLException {
++          Long scn = connection.getSCN();
++          MetaDataClient client = new MetaDataClient(connection);
++          long serverTimeStamp = tableRef.getTimeStamp();
++          PTable table = tableRef.getTable();
++          // If we're auto committing, we've already validated the schema 
when we got the ColumnResolver,
++          // so no need to do it again here.
++          if (!connection.getAutoCommit()) {
 +            MetaDataMutationResult result = 
client.updateCache(table.getSchemaName().getString(), 
table.getTableName().getString());
 +            long timestamp = result.getMutationTime();
 +            if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
 +                serverTimeStamp = timestamp;
 +                if (result.wasUpdated()) {
 +                    // TODO: use bitset?
 +                    table = result.getTable();
 +                    PColumn[] columns = new 
PColumn[table.getColumns().size()];
-                     for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> 
rowEntry : values.entrySet()) {
-                         Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
-                         if (valueEntry != PRow.DELETE_MARKER) {
-                             for (PColumn column : valueEntry.keySet()) {
-                                 columns[column.getPosition()] = column;
-                             }
++                    for (Map.Entry<ImmutableBytesPtr,RowMutationState> 
rowEntry : rowKeyToColumnMap.entrySet()) {
++                      RowMutationState valueEntry = rowEntry.getValue();
++                        if (valueEntry != null) {
++                              Map<PColumn, byte[]> colValues = 
valueEntry.getColumnValues();
++                              if (colValues != PRow.DELETE_MARKER) {
++                                for (PColumn column : colValues.keySet()) {
++                                    columns[column.getPosition()] = column;
++                                }
++                              }
                          }
 -                        for (PColumn column : columns) {
 -                            if (column != null) {
 -                                
table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
 -                            }
 +                    }
 +                    for (PColumn column : columns) {
 +                        if (column != null) {
 +                            
table.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
                          }
 -                        tableRef.setTable(table);
                      }
 +                    tableRef.setTable(table);
                  }
              }
 -            timeStamps[i++] = scn == null ? serverTimeStamp == 
QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp 
: scn;
          }
-         return scn == null ? serverTimeStamp == 
QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp 
: scn;
 -        return timeStamps;
--    }
++          return scn == null ? serverTimeStamp == 
QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp 
: scn;
++      }
      
-     private static void logMutationSize(HTableInterface htable, 
List<Mutation> mutations, PhoenixConnection connection) {
+     private static long calculateMutationSize(List<Mutation> mutations) {
          long byteSize = 0;
-         int keyValueCount = 0;
-         if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) {
+         if (GlobalClientMetrics.isMetricsEnabled()) {
              for (Mutation mutation : mutations) {
                  byteSize += mutation.heapSize();
              }
-             MUTATION_BYTES.update(byteSize);
-             if (logger.isDebugEnabled()) {
-                 logger.debug(LogUtil.addCustomAnnotations("Sending " + 
mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + 
" with " + keyValueCount + " key values of total size " + byteSize + " bytes", 
connection));
-             }
          }
+         GLOBAL_MUTATION_BYTES.update(byteSize);
+         return byteSize;
      }
      
 +    private boolean hasKeyValueColumn(PTable table, PTable index) {
 +        IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
 +        return !maintainer.getAllColumns().isEmpty();
 +    }
 +    
 +    private void divideImmutableIndexes(Iterator<PTable> 
enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> 
keyValueIndexes) {
 +        while (enabledImmutableIndexes.hasNext()) {
 +            PTable index = enabledImmutableIndexes.next();
 +            if (index.getIndexType() != IndexType.LOCAL) {
 +                if (hasKeyValueColumn(table, index)) {
 +                    keyValueIndexes.add(index);
 +                } else {
 +                    rowKeyIndexes.add(index);
 +                }
 +            }
 +        }
 +    }
 +    private class MetaDataAwareHTable extends DelegateHTableInterface {
 +        private final TableRef tableRef;
 +        
 +        private MetaDataAwareHTable(HTableInterface delegate, TableRef 
tableRef) {
 +            super(delegate);
 +            this.tableRef = tableRef;
 +        }
 +        
 +        /**
 +         * Called by Tephra when a transaction is aborted. We have this 
wrapper so that we get an
 +         * opportunity to attach our index meta data to the mutations such 
that we can also undo
 +         * the index mutations.
 +         */
 +        @Override
 +        public void delete(List<Delete> deletes) throws IOException {
 +            try {
 +                PTable table = tableRef.getTable();
 +                List<PTable> indexes = table.getIndexes();
 +                Iterator<PTable> enabledIndexes = 
IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
 +                if (enabledIndexes.hasNext()) {
 +                    List<PTable> keyValueIndexes = Collections.emptyList();
 +                    ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
 +                    boolean attachMetaData = 
table.getIndexMaintainers(indexMetaDataPtr, connection);
 +                    if (table.isImmutableRows()) {
 +                        List<PTable> rowKeyIndexes = 
Lists.newArrayListWithExpectedSize(indexes.size());
 +                        keyValueIndexes = 
Lists.newArrayListWithExpectedSize(indexes.size());
 +                        divideImmutableIndexes(enabledIndexes, table, 
rowKeyIndexes, keyValueIndexes);
 +                        // Generate index deletes for immutable indexes that 
only reference row key
 +                        // columns and submit directly here.
 +                        ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
 +                        for (PTable index : rowKeyIndexes) {
 +                            List<Delete> indexDeletes = 
IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, 
connection.getKeyValueBuilder(), connection);
 +                            HTableInterface hindex = 
connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
 +                            hindex.delete(indexDeletes);
 +                        }
 +                    }
 +                    
 +                    // If we have mutable indexes, local immutable indexes, 
or global immutable indexes
 +                    // that reference key value columns, setup index meta 
data and attach here. In this
 +                    // case updates to the indexes will be generated on the 
server side.
 +                    // An alternative would be to let Tephra track the row 
keys for the immutable index
 +                    // by adding it as a transaction participant (soon we can 
prevent any conflict
 +                    // detection from occurring) with the downside being the 
additional memory required.
 +                    if (!keyValueIndexes.isEmpty()) {
 +                        attachMetaData = true;
 +                        IndexMaintainer.serializeAdditional(table, 
indexMetaDataPtr, keyValueIndexes, connection);
 +                    }
 +                    if (attachMetaData) {
 +                        setMetaDataOnMutations(tableRef, deletes, 
indexMetaDataPtr);
 +                    }
 +                }
 +                delegate.delete(deletes);
 +            } catch (SQLException e) {
 +                throw new IOException(e);
 +            }
 +        }
 +    }
 +    
      @SuppressWarnings("deprecation")
 -    public void commit() throws SQLException {
 +    private void send(Iterator<TableRef> tableRefIterator) throws 
SQLException {
          int i = 0;
 -        PName tenantId = connection.getTenantId();
 -        long[] serverTimeStamps = validate();
 -        Iterator<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,RowMutationState>>> iterator = 
this.mutations.entrySet().iterator();
 +        long[] serverTimeStamps = null;
 +        boolean sendAll = false;
 +        // Validate up front if not transactional so that we 
 +        if (tableRefIterator == null) {
 +            serverTimeStamps = validateAll();
 +            tableRefIterator = mutations.keySet().iterator();
 +            sendAll = true;
 +        }
 +
          // add tracing for this operation
-         TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables");
-         Span span = trace.getSpan();
-         ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
-         while (tableRefIterator.hasNext()) {
-             TableRef tableRef = tableRefIterator.next();
-             Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = 
mutations.get(tableRef);
-             if (valuesMap == null) {
-                 continue;
-             }
-             PTable table = tableRef.getTable();
-             // Track tables to which we've sent uncommitted data
-             if (table.isTransactional()) {
-                 
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-             }
-             table.getIndexMaintainers(indexMetaDataPtr, connection);
-             boolean isDataTable = true;
-             // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
-             long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
-             Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
-             while (mutationsIterator.hasNext()) {
-                 Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
-                 byte[] htableName = pair.getFirst();
-                 List<Mutation> mutations = pair.getSecond();
-                 
-                 //create a span per target table
-                 //TODO maybe we can be smarter about the table name to string 
here?
-                 Span child = Tracing.child(span,"Writing mutation batch for 
table: "+Bytes.toString(htableName));
- 
-                 int retryCount = 0;
-                 boolean shouldRetry = false;
-                 do {
-                     ServerCache cache = null;
-                     if (isDataTable) {
-                         cache = setMetaDataOnMutations(tableRef, mutations, 
indexMetaDataPtr);
-                     }
-                 
-                     // If we haven't retried yet, retry for this case only, 
as it's possible that
-                     // a split will occur after we send the index metadata 
cache to all known
-                     // region servers.
-                     shouldRetry = cache != null;
-                     SQLException sqlE = null;
-                     HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
-                     try {
-                         if (table.isTransactional()) {
-                             // If we have indexes, wrap the HTable in a 
delegate HTable that
-                             // will attach the necessary index meta data in 
the event of a
-                             // rollback
-                             if (!table.getIndexes().isEmpty()) {
-                                 hTable = new MetaDataAwareHTable(hTable, 
tableRef);
-                             }
-                             TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table);
-                             // Don't add immutable indexes (those are the 
only ones that would participate
-                             // during a commit), as we don't need conflict 
detection for these.
-                             if (isDataTable) {
-                                 // Even for immutable, we need to do this so 
that an abort has the state
-                                 // necessary to generate the rows to delete.
-                                 addTransactionParticipant(txnAware);
-                             } else {
-                                 txnAware.startTx(getTransaction());
-                             }
-                             hTable = txnAware;
-                         }
-                         logMutationSize(hTable, mutations, connection);
-                         MUTATION_BATCH_SIZE.update(mutations.size());
-                         long startTime = System.currentTimeMillis();
-                         child.addTimelineAnnotation("Attempt " + retryCount);
-                         hTable.batch(mutations);
-                         child.stop();
-                         long duration = System.currentTimeMillis() - 
startTime;
-                         MUTATION_COMMIT_TIME.update(duration);
-                         shouldRetry = false;
-                         if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + 
mutations.size() + " mutations into " + table.getName().getString() + ": " + 
duration + " ms", connection));
-                     } catch (Exception e) {
-                         SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
-                         if (inferredE != null) {
-                             if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-                                 // Swallow this exception once, as it's 
possible that we split after sending the index metadata
-                                 // and one of the region servers doesn't have 
it. This will cause it to have it the next go around.
-                                 // If it fails again, we don't retry.
-                                 String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
-                                 logger.warn(LogUtil.addCustomAnnotations(msg, 
connection));
-                                 
connection.getQueryServices().clearTableRegionCache(htableName);
- 
-                                 // add a new child span as this one failed
-                                 child.addTimelineAnnotation(msg);
-                                 child.stop();
-                                 child = Tracing.child(span,"Failed batch, 
attempting retry");
- 
-                                 continue;
-                             }
-                             e = inferredE;
-                         }
-                         // Throw to client with both what was committed so 
far and what is left to be committed.
-                         // That way, client can either undo what was done or 
try again with what was not done.
-                         sqlE = new CommitException(e, this);
-                     } finally {
-                         try {
-                             if (cache != null) {
-                                 cache.close();
-                             }
-                         } finally {
-                             try {
-                                 hTable.close();
-                             } 
-                             catch (IOException e) {
-                                 if (sqlE != null) {
-                                     
sqlE.setNextException(ServerUtil.parseServerException(e));
-                                 } else {
-                                     sqlE = ServerUtil.parseServerException(e);
-                                 }
-                             } 
-                             if (sqlE != null) {
-                                 throw sqlE;
-                             }
-                         }
-                     }
-                 } while (shouldRetry && retryCount++ < 1);
-                 isDataTable = false;
-             }
-             if (tableRef.getTable().getType() != PTableType.INDEX) {
-                 numRows -= valuesMap.size();
-             }
-             // Remove batches as we process them
-             if (sendAll) {
-               tableRefIterator.remove(); // Iterating through actual map in 
this case
-             } else {
-               mutations.remove(tableRef);
-             }
+         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables")) {
+             Span span = trace.getSpan();
 -            while (iterator.hasNext()) {
 -                Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry = iterator.next();
 -                // at this point we are going through mutations for each table
 -
 -                Map<ImmutableBytesPtr,RowMutationState> valuesMap = 
entry.getValue();
 -                // above is mutations for a table where the first part is the 
row key and the second part is column values.
 -
 -                TableRef tableRef = entry.getKey();
 -                PTable table = tableRef.getTable();
 -                table.getIndexMaintainers(tempPtr, connection);
 -                boolean hasIndexMaintainers = tempPtr.getLength() > 0;
 -                boolean isDataTable = true;
 -                long serverTimestamp = serverTimeStamps[i++];
 -                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false);
 -                // above returns an iterator of pair where the first  
 -                while (mutationsIterator.hasNext()) {
 -                    Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
 -                    byte[] htableName = pair.getFirst();
 -                    List<Mutation> mutations = pair.getSecond();
 -
 -                    //create a span per target table
 -                    //TODO maybe we can be smarter about the table name to 
string here?
 -                    Span child = Tracing.child(span,"Writing mutation batch 
for table: "+Bytes.toString(htableName));
 -
 -                    int retryCount = 0;
 -                    boolean shouldRetry = false;
 -                    do {
 -                        ServerCache cache = null;
 -                        if (hasIndexMaintainers && isDataTable) {
 -                            byte[] attribValue = null;
 -                            byte[] uuidValue;
 -                            if 
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, 
tempPtr.getLength())) {
 -                                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
 -                                cache = 
client.addIndexMetadataCache(mutations, tempPtr);
 -                                child.addTimelineAnnotation("Updated index 
metadata cache");
 -                                uuidValue = cache.getId();
 -                                // If we haven't retried yet, retry for this 
case only, as it's possible that
 -                                // a split will occur after we send the index 
metadata cache to all known
 -                                // region servers.
 -                                shouldRetry = true;
 -                            } else {
 -                                attribValue = 
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
 -                                uuidValue = ServerCacheClient.generateId();
 -                            }
 -                            // Either set the UUID to be able to access the 
index metadata from the cache
 -                            // or set the index metadata directly on the 
Mutation
 -                            for (Mutation mutation : mutations) {
 -                                if (tenantId != null) {
 -                                    byte[] tenantIdBytes = 
ScanUtil.getTenantIdBytes(
 -                                        table.getRowKeySchema(),
 -                                        table.getBucketNum()!=null,
 -                                        tenantId);
 -                                    
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
 -                                }
 -                                
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
 -                                if (attribValue != null) {
 -                                    
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
 -                                }
 -                            }
 -                        }
 -
 -                        SQLException sqlE = null;
 -                        HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
 -                        try {
 -                            long numMutations = mutations.size();
++              ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
++              while (tableRefIterator.hasNext()) {
++                      // at this point we are going through mutations for 
each table
++                  TableRef tableRef = tableRefIterator.next();
++                  Map<ImmutableBytesPtr, RowMutationState> valuesMap = 
mutations.get(tableRef);
++                  if (valuesMap == null) {
++                      continue;
++                  }
++                  PTable table = tableRef.getTable();
++                  // Track tables to which we've sent uncommitted data
++                  if (table.isTransactional()) {
++                      
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
++                  }
++                  table.getIndexMaintainers(indexMetaDataPtr, connection);
++                  boolean isDataTable = true;
++                  // Validate as we go if transactional since we can undo if 
a problem occurs (which is unlikely)
++                  long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
++                  Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = 
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
++                  while (mutationsIterator.hasNext()) {
++                      Pair<byte[],List<Mutation>> pair = 
mutationsIterator.next();
++                      byte[] htableName = pair.getFirst();
++                      List<Mutation> mutations = pair.getSecond();
++                      
++                      //create a span per target table
++                      //TODO maybe we can be smarter about the table name to 
string here?
++                      Span child = Tracing.child(span,"Writing mutation batch 
for table: "+Bytes.toString(htableName));
++      
++                      int retryCount = 0;
++                      boolean shouldRetry = false;
++                      do {
++                          ServerCache cache = null;
++                          if (isDataTable) {
++                              cache = setMetaDataOnMutations(tableRef, 
mutations, indexMetaDataPtr);
++                          }
++                      
++                          // If we haven't retried yet, retry for this case 
only, as it's possible that
++                          // a split will occur after we send the index 
metadata cache to all known
++                          // region servers.
++                          shouldRetry = cache != null;
++                          SQLException sqlE = null;
++                          HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
++                          try {
++                              if (table.isTransactional()) {
++                                  // If we have indexes, wrap the HTable in a 
delegate HTable that
++                                  // will attach the necessary index meta 
data in the event of a
++                                  // rollback
++                                  if (!table.getIndexes().isEmpty()) {
++                                      hTable = new 
MetaDataAwareHTable(hTable, tableRef);
++                                  }
++                                  TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable, table);
++                                  // Don't add immutable indexes (those are 
the only ones that would participate
++                                  // during a commit), as we don't need 
conflict detection for these.
++                                  if (isDataTable) {
++                                      // Even for immutable, we need to do 
this so that an abort has the state
++                                      // necessary to generate the rows to 
delete.
++                                      addTransactionParticipant(txnAware);
++                                  } else {
++                                      txnAware.startTx(getTransaction());
++                                  }
++                                  hTable = txnAware;
++                              }
++                              long numMutations = mutations.size();
+                             GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+                             
+                             long startTime = System.currentTimeMillis();
 -                            child.addTimelineAnnotation("Attempt " + 
retryCount);
 -                            hTable.batch(mutations);
 -                            child.stop();
++                            child.addTimelineAnnotation("Attempt " + 
retryCount);;
++                              hTable.batch(mutations);
++                              child.stop();
++                              child.stop();
+                             shouldRetry = false;
+                             long mutationCommitTime = 
System.currentTimeMillis() - startTime;
+                             
GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+                             
+                             long mutationSizeBytes = 
calculateMutationSize(mutations);
+                             MutationMetric mutationsMetric = new 
MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime);
+                             
mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), 
mutationsMetric);
 -                        } catch (Exception e) {
 -                            SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
 -                            if (inferredE != null) {
 -                                if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
 -                                    // Swallow this exception once, as it's 
possible that we split after sending the index metadata
 -                                    // and one of the region servers doesn't 
have it. This will cause it to have it the next go around.
 -                                    // If it fails again, we don't retry.
 -                                    String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
 -                                    
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
 -                                    
connection.getQueryServices().clearTableRegionCache(htableName);
++                          } catch (Exception e) {
++                              SQLException inferredE = 
ServerUtil.parseServerExceptionOrNull(e);
++                              if (inferredE != null) {
++                                  if (shouldRetry && retryCount == 0 && 
inferredE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
++                                      // Swallow this exception once, as it's 
possible that we split after sending the index metadata
++                                      // and one of the region servers 
doesn't have it. This will cause it to have it the next go around.
++                                      // If it fails again, we don't retry.
++                                      String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
++                                      
logger.warn(LogUtil.addCustomAnnotations(msg, connection));
++                                      
connection.getQueryServices().clearTableRegionCache(htableName);
++      
++                                      // add a new child span as this one 
failed
++                                      child.addTimelineAnnotation(msg);
++                                      child.stop();
++                                      child = Tracing.child(span,"Failed 
batch, attempting retry");
++      
++                                      continue;
++                                  }
++                                  e = inferredE;
++                              }
++                              // Throw to client with both what was committed 
so far and what is left to be committed.
++                              // That way, client can either undo what was 
done or try again with what was not done.
++                              sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
++                          } finally {
++                              try {
++                                  if (cache != null) {
++                                      cache.close();
++                                  }
++                              } finally {
++                                  try {
++                                      hTable.close();
++                                  } 
++                                  catch (IOException e) {
++                                      if (sqlE != null) {
++                                          
sqlE.setNextException(ServerUtil.parseServerException(e));
++                                      } else {
++                                          sqlE = 
ServerUtil.parseServerException(e);
++                                      }
++                                  } 
++                                  if (sqlE != null) {
++                                      throw sqlE;
++                                  }
++                              }
++                          }
++                      } while (shouldRetry && retryCount++ < 1);
++                      isDataTable = false;
++                  }
++                  if (tableRef.getTable().getType() != PTableType.INDEX) {
++                      numRows -= valuesMap.size();
++                  }
++                  // Remove batches as we process them
++                  if (sendAll) {
++                      tableRefIterator.remove(); // Iterating through actual 
map in this case
++                  } else {
++                      mutations.remove(tableRef);
++                  }
++              }
 +        }
-         trace.close();
 +        // Note that we cannot assume that *all* mutations have been sent, 
since we've optimized this
 +        // now to only send the mutations for the tables we're querying, 
hence we've removed the
 +        // assertions that we're here before.
 +    }
  
 -                                    // add a new child span as this one failed
 -                                    child.addTimelineAnnotation(msg);
 -                                    child.stop();
 -                                    child = Tracing.child(span,"Failed batch, 
attempting retry");
 +    public byte[] encodeTransaction() throws SQLException {
 +        try {
 +            return CODEC.encode(getTransaction());
 +        } catch (IOException e) {
 +            throw new SQLException(e);
 +        }
 +    }
 +    
 +    public static Transaction decodeTransaction(byte[] txnBytes) throws 
IOException {
 +      return (txnBytes == null || txnBytes.length==0) ? null : 
CODEC.decode(txnBytes);
 +    }
  
 -                                    continue;
 -                                }
 -                                e = inferredE;
 -                            }
 -                            sqlE = new CommitException(e, 
getUncommittedStatementIndexes());
 -                        } finally {
 -                            try {
 -                                hTable.close();
 -                            } catch (IOException e) {
 -                                if (sqlE != null) {
 -                                    
sqlE.setNextException(ServerUtil.parseServerException(e));
 -                                } else {
 -                                    sqlE = ServerUtil.parseServerException(e);
 -                                }
 -                            } finally {
 -                                try {
 -                                    if (cache != null) {
 -                                        cache.close();
 -                                    }
 -                                } finally {
 -                                    if (sqlE != null) {
 -                                        throw sqlE;
 -                                    }
 -                                }
 -                            }
 -                        }
 -                    } while (shouldRetry && retryCount++ < 1);
 -                    isDataTable = false;
 -                }
 -                if (tableRef.getTable().getType() != PTableType.INDEX) {
 -                    numRows -= entry.getValue().size();
 +    private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
 +            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
 +        PTable table = tableRef.getTable();
 +        byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
 +        ServerCache cache = null;
 +        byte[] attribValue = null;
 +        byte[] uuidValue = null;
 +        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
 +        if (table.isTransactional()) {
 +            txState = encodeTransaction();
 +        }
 +        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
 +        if (hasIndexMetaData) {
 +            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, 
mutations, indexMetaDataPtr.getLength() + txState.length)) {
 +                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
 +                cache = client.addIndexMetadataCache(mutations, 
indexMetaDataPtr, txState);
 +                uuidValue = cache.getId();
 +            } else {
 +                attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
 +                uuidValue = ServerCacheClient.generateId();
 +            }
 +        } else if (txState.length == 0) {
 +            return null;
 +        }
 +        // Either set the UUID to be able to access the index metadata from 
the cache
 +        // or set the index metadata directly on the Mutation
 +        for (Mutation mutation : mutations) {
 +            if (tenantId != null) {
 +                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId);
 +            }
 +            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
 +            if (attribValue != null) {
 +                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, 
attribValue);
 +                if (txState.length > 0) {
 +                    mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
                  }
 -                iterator.remove(); // Remove batches as we process them
 +            } else if (!hasIndexMetaData && txState.length > 0) {
 +                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
              }
          }
 -        assert(numRows==0);
 -        assert(this.mutations.isEmpty());
 +        return cache;
      }
      
 -    public void rollback(PhoenixConnection connection) throws SQLException {
 +    public void clear() throws SQLException {
          this.mutations.clear();
          numRows = 0;
      }
@@@ -853,95 -570,91 +940,182 @@@
      @Override
      public void close() throws SQLException {
      }
 +
 +    private void reset() {
 +        txStarted = false;
 +        tx = null;
 +        uncommittedPhysicalNames.clear();
 +    }
 +    
 +    public void rollback() throws SQLException {
 +        clear();
 +        txAwares.clear();
 +        if (txContext != null) {
 +            try {
 +                if (txStarted) {
 +                    txContext.abort();
 +                }
 +            } catch (TransactionFailureException e) {
 +                throw new SQLException(e); // TODO: error code
 +            } finally {
 +              reset();
 +            }
 +        }
 +    }
 +    
 +    public void commit() throws SQLException {
 +      boolean sendMutationsFailed=false;
 +        try {
 +            send();
 +        } catch (Throwable t) {
 +              sendMutationsFailed=true;
 +              throw t;
 +        } finally {
 +            txAwares.clear();
 +            if (txContext != null) {
 +                try {
 +                    if (txStarted && !sendMutationsFailed) {
 +                        txContext.finish();
 +                    }
 +                } catch (TransactionFailureException e) {
 +                    try {
 +                        txContext.abort(e);
 +                        throw TransactionUtil.getSQLException(e);
 +                    } catch (TransactionFailureException e1) {
 +                        throw TransactionUtil.getSQLException(e);
 +                    }
 +                } finally {
 +                      if (!sendMutationsFailed) {
 +                              reset();
 +                      }
 +                  }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Send mutations to hbase, so they are visible to subsequent reads,
 +     * starting a transaction if transactional and one has not yet been 
started.
 +     * @param tableRefs
 +     * @return true if at least partially transactional and false otherwise.
 +     * @throws SQLException
 +     */
 +    public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws 
SQLException {
 +        Transaction currentTx = getTransaction();
 +        if (currentTx != null) {
 +            // Initialize visibility so that transactions see their own 
writes.
 +            // The checkpoint() method will set it to not see writes if 
necessary.
 +            currentTx.setVisibility(VisibilityLevel.SNAPSHOT);
 +        }
 +        Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, 
new Predicate<TableRef>(){
 +            @Override
 +            public boolean apply(TableRef tableRef) {
 +                return tableRef.getTable().isTransactional();
 +            }
 +        });
 +        if (filteredTableRefs.hasNext()) {
 +            // FIXME: strip table alias to prevent equality check from 
failing due to alias mismatch on null alias.
 +            // We really should be keying the tables based on the physical 
table name.
 +            List<TableRef> strippedAliases = 
Lists.newArrayListWithExpectedSize(mutations.keySet().size());
 +            while (filteredTableRefs.hasNext()) {
 +                TableRef tableRef = filteredTableRefs.next();
 +                strippedAliases.add(new TableRef(null, tableRef.getTable(), 
tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), 
tableRef.hasDynamicCols()));
 +            }
 +            startTransaction();
 +            send(strippedAliases.iterator());
 +            return true;
 +        }
 +        return false;
 +    }
 +        
 +    public void send() throws SQLException {
 +        send(null);
 +    }
+     
+     public static int[] joinSortedIntArrays(int[] a, int[] b) {
+         int[] result = new int[a.length + b.length];
+         int i = 0, j = 0, k = 0, current;
+         while (i < a.length && j < b.length) {
+             current = a[i] < b[j] ? a[i++] : b[j++];
+             for ( ; i < a.length && a[i] == current; i++);
+             for ( ; j < b.length && b[j] == current; j++);
+             result[k++] = current;
+         }
+         while (i < a.length) {
+             for (current = a[i++] ; i < a.length && a[i] == current; i++);
+             result[k++] = current;
+         }
+         while (j < b.length) {
+             for (current = b[j++] ; j < b.length && b[j] == current; j++);
+             result[k++] = current;
+         }
+         return Arrays.copyOf(result, k);
+     }
+     
+     @Immutable
+     public static class RowTimestampColInfo {
+         private final boolean useServerTimestamp;
+         private final Long rowTimestamp; 
+         
+         public static final RowTimestampColInfo NULL_ROWTIMESTAMP_INFO = new 
RowTimestampColInfo(false, null);
+ 
+         public RowTimestampColInfo(boolean autoGenerate, Long value) {
+             this.useServerTimestamp = autoGenerate;
+             this.rowTimestamp = value;
+         }
+         
+         public boolean useServerTimestamp() {
+             return useServerTimestamp;
+         }
+         
+         public Long getTimestamp() {
+             return rowTimestamp;
+         }
+     }
+     
+     public static class RowMutationState {
+         @Nonnull private Map<PColumn,byte[]> columnValues;
+         private int[] statementIndexes;
+         @Nonnull private final RowTimestampColInfo rowTsColInfo;
+         
+         public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, 
int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo) {
+             checkNotNull(columnValues);
+             checkNotNull(rowTsColInfo);
+             this.columnValues = columnValues;
+             this.statementIndexes = new int[] {statementIndex};
+             this.rowTsColInfo = rowTsColInfo;
+         }
+ 
+         Map<PColumn, byte[]> getColumnValues() {
+             return columnValues;
+         }
+ 
+         int[] getStatementIndexes() {
+             return statementIndexes;
+         }
+ 
+         void join(RowMutationState newRow) {
+             getColumnValues().putAll(newRow.getColumnValues());
+             statementIndexes = joinSortedIntArrays(statementIndexes, 
newRow.getStatementIndexes());
+         }
+         
+         @Nonnull
+         RowTimestampColInfo getRowTimestampColInfo() {
+             return rowTsColInfo;
+         }
+        
+     }
+     
+     public ReadMetricQueue getReadMetricQueue() {
+         return readMetricQueue;
+     }
+ 
+     public void setReadMetricQueue(ReadMetricQueue readMetricQueue) {
+         this.readMetricQueue = readMetricQueue;
+     }
+ 
+     public MutationMetricQueue getMutationMetricQueue() {
+         return mutationMetricQueue;
+     }
+     
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index e82934a,297b6cc..a5c4043
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@@ -45,10 -44,11 +45,12 @@@ import org.apache.phoenix.exception.SQL
  import org.apache.phoenix.exception.SQLExceptionInfo;
  import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
  import org.apache.phoenix.expression.Expression;
+ import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
  import org.apache.phoenix.iterate.MappedByteBufferQueue;
+ import org.apache.phoenix.iterate.ParallelScanGrouper;
  import org.apache.phoenix.iterate.ResultIterator;
  import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 +import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
  import org.apache.phoenix.parse.FilterableStatement;
  import org.apache.phoenix.parse.JoinTableNode.JoinType;
  import org.apache.phoenix.query.KeyRange;
@@@ -84,7 -83,7 +86,8 @@@ public class SortMergeJoinPlan implemen
      private final KeyValueSchema rhsSchema;
      private final int rhsFieldPosition;
      private final boolean isSingleValueOnly;
 +    private final Set<TableRef> tableRefs;
+     private final int thresholdBytes;
  
      public SortMergeJoinPlan(StatementContext context, FilterableStatement 
statement, TableRef table, 
              JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, 
List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
@@@ -103,17 -102,10 +106,18 @@@
          this.rhsSchema = buildSchema(rhsTable);
          this.rhsFieldPosition = rhsFieldPosition;
          this.isSingleValueOnly = isSingleValueOnly;
 +        this.tableRefs = 
Sets.newHashSetWithExpectedSize(lhsPlan.getSourceRefs().size() + 
rhsPlan.getSourceRefs().size());
 +        this.tableRefs.addAll(lhsPlan.getSourceRefs());
 +        this.tableRefs.addAll(rhsPlan.getSourceRefs());
+         this.thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
+                 QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
      }
  
-     
 +    @Override
 +    public Operation getOperation() {
 +        return statement.getOperation();
 +    }
 +
      private static KeyValueSchema buildSchema(PTable table) {
          KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
          if (table != null) {
@@@ -644,11 -639,11 +651,16 @@@
              
          }
      }
+     
+     @Override
+     public boolean useRoundRobinIterator() {
+         return false;
+     }
  
 +    @Override
 +    public Set<TableRef> getSourceRefs() {
 +        return tableRefs;
 +    }
 +
  }
  

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 0000000,53745fe..c0035ff
mode 000000,100644..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@@ -1,0 -1,201 +1,220 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.execute;
+ 
+ import java.sql.ParameterMetaData;
+ import java.sql.SQLException;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
++import java.util.Set;
+ 
+ import org.apache.hadoop.hbase.client.Scan;
+ import org.apache.phoenix.compile.ExplainPlan;
+ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+ import org.apache.phoenix.compile.QueryPlan;
+ import org.apache.phoenix.compile.RowProjector;
+ import org.apache.phoenix.compile.ScanRanges;
+ import org.apache.phoenix.compile.StatementContext;
+ import org.apache.phoenix.iterate.ConcatResultIterator;
+ import org.apache.phoenix.iterate.LimitingResultIterator;
+ import org.apache.phoenix.iterate.MergeSortTopNResultIterator;
+ import org.apache.phoenix.iterate.ParallelScanGrouper;
+ import org.apache.phoenix.iterate.ResultIterator;
+ import org.apache.phoenix.iterate.UnionResultIterators;
++import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+ import org.apache.phoenix.parse.FilterableStatement;
+ import org.apache.phoenix.query.KeyRange;
+ import org.apache.phoenix.schema.TableRef;
+ import org.apache.phoenix.util.SQLCloseable;
+ 
++import com.google.common.collect.Sets;
++
+ 
+ public class UnionPlan implements QueryPlan {
+     private static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K
+ 
+     private final TableRef tableRef;
+     private final FilterableStatement statement;
+     private final ParameterMetaData paramMetaData;
+     private final OrderBy orderBy;
+     private final StatementContext parentContext;
+     private final Integer limit;
+     private final GroupBy groupBy;
+     private final RowProjector projector;
+     private final boolean isDegenerate;
+     private final List<QueryPlan> plans;
+     private UnionResultIterators iterators;
+ 
+     public UnionPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector,
+             Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> 
plans, ParameterMetaData paramMetaData) throws SQLException {
+         this.parentContext = context;
+         this.statement = statement;
+         this.tableRef = table;
+         this.projector = projector;
+         this.limit = limit;
+         this.orderBy = orderBy;
+         this.groupBy = groupBy;
+         this.plans = plans;
+         this.paramMetaData = paramMetaData;
+         boolean isDegen = true;
+         for (QueryPlan plan : plans) {           
+             if (plan.getContext().getScanRanges() != ScanRanges.NOTHING) {
+                 isDegen = false;
+                 break;
+             } 
+         }
+         this.isDegenerate = isDegen;     
+     }
+ 
+     @Override
+     public boolean isDegenerate() {
+         return isDegenerate;
+     }
+ 
+     @Override
+     public List<KeyRange> getSplits() {
+         if (iterators == null)
+             return null;
+         return iterators.getSplits();
+     }
+ 
+     @Override
+     public List<List<Scan>> getScans() {
+         if (iterators == null)
+             return null;
+         return iterators.getScans();
+     }
+ 
+     @Override
+     public GroupBy getGroupBy() {
+         return groupBy;
+     }
+ 
+     @Override
+     public OrderBy getOrderBy() {
+         return orderBy;
+     }
+ 
+     @Override
+     public TableRef getTableRef() {
+         return tableRef;
+     }
+ 
+     @Override
+     public Integer getLimit() {
+         return limit;
+     }
+ 
+     @Override
+     public RowProjector getProjector() {
+         return projector;
+     }
+ 
+     @Override
+     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
+         return iterator(Collections.<SQLCloseable>emptyList());
+     }
+     
+     @Override
+     public final ResultIterator iterator() throws SQLException {
+         return iterator(Collections.<SQLCloseable>emptyList());
+     }
+ 
+     public final ResultIterator iterator(final List<? extends SQLCloseable> 
dependencies) throws SQLException {
+         this.iterators = new UnionResultIterators(plans, parentContext);
+         ResultIterator scanner;      
+         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
+ 
+         if (isOrdered) { // TopN
+             scanner = new MergeSortTopNResultIterator(iterators, limit, 
orderBy.getOrderByExpressions());
+         } else {
+             scanner = new ConcatResultIterator(iterators);
+             if (limit != null) {
+                 scanner = new LimitingResultIterator(scanner, limit);
+             }          
+         }
+         return scanner;
+     }
+ 
+     @Override
+     public ExplainPlan getExplainPlan() throws SQLException {
+         List<String> steps = new ArrayList<String>();
+         steps.add("UNION ALL OVER " + this.plans.size() + " QUERIES");
+         ResultIterator iterator = iterator();
+         iterator.explain(steps);
+         // Indent plans steps nested under union, except last client-side 
merge/concat step (if there is one)
+         int offset = !orderBy.getOrderByExpressions().isEmpty() || limit != 
null ? 1 : 0;
+         for (int i = 1 ; i < steps.size()-offset; i++) {
+             steps.set(i, "    " + steps.get(i));
+         }
+         return new ExplainPlan(steps);
+     }
+ 
+ 
+     @Override
+     public long getEstimatedSize() {
+         return DEFAULT_ESTIMATED_SIZE;
+     }
+ 
+     @Override
+     public ParameterMetaData getParameterMetaData() {
+         return paramMetaData;
+     }
+ 
+     @Override
+     public FilterableStatement getStatement() {
+         return statement;
+     }
+ 
+     @Override
+     public StatementContext getContext() {
+         return parentContext;
+     }
+ 
+     @Override
+     public boolean isRowKeyOrdered() {
+         return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() 
: groupBy.isOrderPreserving();
+     }
+ 
+     public List<QueryPlan> getPlans() {
+         return this.plans;
+     }
+ 
+     @Override
+     public boolean useRoundRobinIterator() throws SQLException {
+         return false;
+     }
++
++      @Override
++      public Operation getOperation() {
++              return statement.getOperation();
++      }
++
++      @Override
++      public Set<TableRef> getSourceRefs() {
++              // TODO is this correct?
++              Set<TableRef> sources = 
Sets.newHashSetWithExpectedSize(plans.size());
++              for (QueryPlan plan : plans) {
++                      sources.addAll(plan.getSourceRefs());
++              }
++              return sources;
++      }
+ }
+ 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------

Reply via email to