http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 9f03747,0000000..5c7d32f
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@@ -1,445 -1,0 +1,447 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
 + * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
 + * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
 + * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
 + * governing permissions and limitations under the License.
 + */
 +package org.apache.phoenix.index;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +
 +import co.cask.tephra.Transaction;
 +import co.cask.tephra.Transaction.VisibilityLevel;
 +import co.cask.tephra.TxConstants;
 +import co.cask.tephra.hbase98.TransactionAwareHTable;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.CellScanner;
 +import org.apache.hadoop.hbase.CellUtil;
 +import org.apache.hadoop.hbase.CoprocessorEnvironment;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
++import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.TableName;
 +import org.apache.hadoop.hbase.client.HTableInterface;
 +import org.apache.hadoop.hbase.client.Mutation;
 +import org.apache.hadoop.hbase.client.Put;
 +import org.apache.hadoop.hbase.client.Result;
 +import org.apache.hadoop.hbase.client.ResultScanner;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 +import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 +import org.apache.hadoop.hbase.io.TimeRange;
 +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 +import org.apache.hadoop.hbase.regionserver.RegionScanner;
 +import org.apache.hadoop.hbase.util.Pair;
++import org.apache.htrace.Span;
++import org.apache.htrace.Trace;
++import org.apache.htrace.TraceScope;
 +import org.apache.phoenix.compile.ScanRanges;
 +import org.apache.phoenix.hbase.index.MultiMutation;
 +import org.apache.phoenix.hbase.index.ValueGetter;
 +import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 +import org.apache.phoenix.hbase.index.covered.TableState;
 +import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 +import org.apache.phoenix.hbase.index.write.IndexWriter;
 +import org.apache.phoenix.query.KeyRange;
 +import org.apache.phoenix.query.QueryConstants;
 +import org.apache.phoenix.schema.types.PDataType;
 +import org.apache.phoenix.schema.types.PVarbinary;
 +import org.apache.phoenix.trace.TracingUtils;
 +import org.apache.phoenix.trace.util.NullSpan;
 +import org.apache.phoenix.util.ScanUtil;
 +import org.apache.phoenix.util.SchemaUtil;
 +import org.apache.phoenix.util.ServerUtil;
- import org.cloudera.htrace.Span;
- import org.cloudera.htrace.Trace;
- import org.cloudera.htrace.TraceScope;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Maps;
 +import com.google.common.collect.Sets;
++import com.google.inject.Key;
 +
 +/**
 + * Do all the work of managing index updates for a transactional table from a 
single coprocessor. Since the transaction
 + * manager essentially time orders writes through conflict detection, the 
logic to maintain a secondary index is quite a
 + * bit simpler than the non transactional case. For example, there's no need 
to muck with the WAL, as failure scenarios
 + * are handled by aborting the transaction.
 + */
 +public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 +
 +    private static final Log LOG = 
LogFactory.getLog(PhoenixTransactionalIndexer.class);
 +
 +    private PhoenixIndexCodec codec;
 +    private IndexWriter writer;
 +    private boolean stopped;
 +
 +    @Override
 +    public void start(CoprocessorEnvironment e) throws IOException {
 +        final RegionCoprocessorEnvironment env = 
(RegionCoprocessorEnvironment)e;
 +        String serverName = 
env.getRegionServerServices().getServerName().getServerName();
 +        codec = new PhoenixIndexCodec();
 +        codec.initialize(env);
 +
 +        // setup the actual index writer
 +        this.writer = new IndexWriter(env, serverName + "-tx-index-writer");
 +    }
 +
 +    @Override
 +    public void stop(CoprocessorEnvironment e) throws IOException {
 +        if (this.stopped) { return; }
 +        this.stopped = true;
 +        String msg = "TxIndexer is being stopped";
 +        this.writer.stop(msg);
 +    }
 +
 +    private static Iterator<Mutation> getMutationIterator(final 
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
 +        return new Iterator<Mutation>() {
 +            private int i = 0;
 +            
 +            @Override
 +            public boolean hasNext() {
 +                return i < miniBatchOp.size();
 +            }
 +
 +            @Override
 +            public Mutation next() {
 +                return miniBatchOp.getOperation(i++);
 +            }
 +
 +            @Override
 +            public void remove() {
 +                throw new UnsupportedOperationException();
 +            }
 +            
 +        };
 +    }
 +    @Override
 +    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> 
c,
 +            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
 +
 +        Mutation m = miniBatchOp.getOperation(0);
 +        if (!codec.isEnabled(m)) {
 +            super.preBatchMutate(c, miniBatchOp);
 +            return;
 +        }
 +
 +        Map<String,byte[]> updateAttributes = m.getAttributesMap();
 +        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
 +        byte[] txRollbackAttribute = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
 +        Collection<Pair<Mutation, byte[]>> indexUpdates = null;
 +        // get the current span, or just use a null-span to avoid a bunch of 
if statements
 +        try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
 +            Span current = scope.getSpan();
 +            if (current == null) {
 +                current = NullSpan.INSTANCE;
 +            }
 +
 +            // get the index updates for all elements in this batch
 +            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, 
getMutationIterator(miniBatchOp), txRollbackAttribute);
 +
 +            current.addTimelineAnnotation("Built index updates, doing 
preStep");
 +            TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
 +
 +            // no index updates, so we are done
 +            if (!indexUpdates.isEmpty()) {
 +                this.writer.write(indexUpdates);
 +            }
 +        } catch (Throwable t) {
 +            String msg = "Failed to update index with entries:" + 
indexUpdates;
 +            LOG.error(msg, t);
 +            ServerUtil.throwIOException(msg, t);
 +        }
 +    }
 +
 +    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) 
throws IOException {
 +        ResultScanner currentScanner = null;
 +        ResultScanner previousScanner = null;
 +        TransactionAwareHTable txTable = null;
 +        // Collect up all mutations in batch
 +        Map<ImmutableBytesPtr, MultiMutation> mutations =
 +                new HashMap<ImmutableBytesPtr, MultiMutation>();
 +        while(mutationIterator.hasNext()) {
 +            Mutation m = mutationIterator.next();
 +            // add the mutation to the batch set
 +            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
 +            MultiMutation stored = mutations.get(row);
 +            // we haven't seen this row before, so add it
 +            if (stored == null) {
 +                stored = new MultiMutation(row);
 +                mutations.put(row, stored);
 +            }
 +            stored.addAll(m);
 +        }
 +        
 +        // Collect the set of mutable ColumnReferences so that we can first
 +        // run a scan to get the current state. We'll need this to delete
 +        // the existing index rows.
 +        Transaction tx = indexMetaData.getTransaction();
 +        assert(tx != null);
 +        List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
 +        Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
 +        for (IndexMaintainer indexMaintainer : indexMaintainers) {
 +            // For transactional tables, we use an index maintainer
 +            // to aid in rollback if there's a KeyValue column in the index. 
The alternative would be
 +            // to hold on to all uncommitted index row keys (even ones 
already sent to HBase) on the
 +            // client side.
 +                mutableColumns.addAll(indexMaintainer.getAllColumns());
 +        }
 +
 +        Collection<Pair<Mutation, byte[]>> indexUpdates = new 
ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * 
indexMaintainers.size());
 +        try {
 +            if (!mutableColumns.isEmpty()) {
 +                List<KeyRange> keys = 
Lists.newArrayListWithExpectedSize(mutations.size());
 +                for (ImmutableBytesPtr ptr : mutations.keySet()) {
 +                    
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
 +                }
 +                Scan scan = new Scan();
 +                // Project all mutable columns
 +                for (ColumnReference ref : mutableColumns) {
 +                    scan.addColumn(ref.getFamily(), ref.getQualifier());
 +                }
 +                // Project empty key value column
 +                
scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
-                 ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
++                ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, 
KeyRange.EVERYTHING_RANGE, null, true, -1);
 +                scanRanges.initializeScan(scan);
 +                scan.setFilter(scanRanges.getSkipScanFilter());
 +                TableName tableName = 
env.getRegion().getRegionInfo().getTable();
 +                HTableInterface htable = env.getTable(tableName);
 +                txTable = new TransactionAwareHTable(htable);
 +                txTable.startTx(tx);
 +                currentScanner = txTable.getScanner(scan);
 +                if (txRollbackAttribute!=null) {
 +                      
tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
 +                      previousScanner = txTable.getScanner(scan);
 +                }
 +            }
 +            // In case of rollback we have to do two scans, one with 
VisibilityLevel.SNAPSHOT to see the current state of the row 
 +            // and another with VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT to 
see the previous state of the row
 +            // so that we can rollback a previous delete + put 
 +            processScanner(env, indexMetaData, txRollbackAttribute, 
previousScanner, mutations, tx, mutableColumns, indexUpdates, false);
 +            processScanner(env, indexMetaData, txRollbackAttribute, 
currentScanner, mutations, tx, mutableColumns, indexUpdates, true);
 +            for (Mutation m : mutations.values()) {
 +              long timestamp = getTimestamp(txRollbackAttribute, 
tx.getWritePointer(), m);
 +              TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), timestamp, m);
 +              // if we did not generate valid put, we might have to generate 
a delete
 +                if (!generatePuts(indexMetaData, indexUpdates, state)) {
 +                      generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
 +                }
 +            }
 +        } finally {
 +            if (txTable != null) txTable.close();
 +        }
 +        
 +        return indexUpdates;
 +    }
 +
 +      private void processScanner(RegionCoprocessorEnvironment env,
 +                      PhoenixIndexMetaData indexMetaData, byte[] 
txRollbackAttribute,
 +                      ResultScanner scanner,
 +                      Map<ImmutableBytesPtr, MultiMutation> mutations, 
Transaction tx,
 +                      Set<ColumnReference> mutableColumns,
 +                      Collection<Pair<Mutation, byte[]>> indexUpdates, 
boolean removeMutation) throws IOException {
 +              if (scanner != null) {
 +                  Result result;
 +                  ColumnReference emptyColRef = new 
ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
 QueryConstants.EMPTY_COLUMN_BYTES);
 +                  while ((result = scanner.next()) != null) {
 +                      Mutation m = removeMutation ? mutations.remove(new 
ImmutableBytesPtr(result.getRow())) : mutations.get(new 
ImmutableBytesPtr(result.getRow()));
 +                      long timestamp = getTimestamp(txRollbackAttribute, 
tx.getWritePointer(), m);
 +                      TxTableState state = new TxTableState(env, 
mutableColumns, indexMetaData.getAttributes(), timestamp, m, emptyColRef, 
result);
 +                      generateDeletes(indexMetaData, indexUpdates, 
txRollbackAttribute, state);
 +                      generatePuts(indexMetaData, indexUpdates, state);
 +                  }
 +              }
 +      }
 +
 +      private long getTimestamp(byte[] txRollbackAttribute, long 
txnWritePointer, Mutation m) {
 +              if (txRollbackAttribute==null) {
 +                      return txnWritePointer;
 +              }
 +              // if this is a rollback generate mutations with the same 
timestamp as the data row mutation as the timestamp might be 
 +        // different from the current txn write pointer because of check 
points
 +              long mutationTimestamp = txnWritePointer;
 +              for (Entry<byte[], List<Cell>> entry : 
m.getFamilyCellMap().entrySet()) {
 +                      mutationTimestamp = 
entry.getValue().get(0).getTimestamp();
 +                      break;
 +              }
 +              return mutationTimestamp;
 +      }
 +
 +      private void generateDeletes(PhoenixIndexMetaData indexMetaData,
 +                      Collection<Pair<Mutation, byte[]>> indexUpdates,
 +                      byte[] attribValue, TxTableState state) throws 
IOException {
 +              Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, 
indexMetaData);
 +              for (IndexUpdate delete : deletes) {
 +                  if (delete.isValid()) {
 +                      
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
 +                      indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
 +                  }
 +              }
 +      }
 +
 +      boolean generatePuts(
 +                      PhoenixIndexMetaData indexMetaData,
 +                      Collection<Pair<Mutation, byte[]>> indexUpdates,
 +                      TxTableState state)
 +                      throws IOException {
 +              state.applyMutation();
 +              Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
 +              boolean validPut = false;
 +              for (IndexUpdate put : puts) {
 +                  if (put.isValid()) {
 +                      indexUpdates.add(new Pair<Mutation, 
byte[]>(put.getUpdate(),put.getTableName()));
 +                      validPut = true;
 +                  }
 +              }
 +              return validPut;
 +      }
 +
 +
 +    private static class TxTableState implements TableState {
 +        private final Mutation mutation;
 +        private final long currentTimestamp;
 +        private final RegionCoprocessorEnvironment env;
 +        private final Map<String, byte[]> attributes;
-         private final List<Cell> pendingUpdates;
++        private final List<KeyValue> pendingUpdates;
 +        private final Set<ColumnReference> indexedColumns;
 +        private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
 +        
 +        private TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, Mutation mutation) {
 +            this.env = env;
 +            this.currentTimestamp = currentTimestamp;
 +            this.indexedColumns = indexedColumns;
 +            this.attributes = attributes;
 +            this.mutation = mutation;
 +            int estimatedSize = indexedColumns.size();
 +            this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
 +            this.pendingUpdates = 
Lists.newArrayListWithExpectedSize(estimatedSize);
 +            try {
 +                CellScanner scanner = mutation.cellScanner();
 +                while (scanner.advance()) {
 +                    Cell cell = scanner.current();
-                     pendingUpdates.add(cell);
++                    pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
 +                }
 +            } catch (IOException e) {
 +                throw new RuntimeException(e); // Impossible
 +            }
 +        }
 +        
 +        public TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
 +            this(env, indexedColumns, attributes, currentTimestamp, m);
 +
 +            for (ColumnReference ref : indexedColumns) {
 +                Cell cell = r.getColumnLatestCell(ref.getFamily(), 
ref.getQualifier());
 +                if (cell != null) {
 +                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +                    ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
 +                    valueMap.put(ref, ptr);
 +                }
 +            }
 +            /*
 +            Cell cell = r.getColumnLatestCell(emptyColRef.getFamily(), 
emptyColRef.getQualifier());
 +            if (cell != null) {
 +                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +                ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
 +                valueMap.put(emptyColRef, ptr);
 +            }
 +            */
 +        }
 +        
 +        @Override
 +        public RegionCoprocessorEnvironment getEnvironment() {
 +            return env;
 +        }
 +
 +        @Override
 +        public long getCurrentTimestamp() {
 +            return currentTimestamp;
 +        }
 +
 +        @Override
 +        public Map<String, byte[]> getUpdateAttributes() {
 +            return attributes;
 +        }
 +
 +        @Override
 +        public byte[] getCurrentRowKey() {
 +            return mutation.getRow();
 +        }
 +
 +        @Override
 +        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
 +            return Collections.emptyList();
 +        }
 +
 +        public void applyMutation() {
 +            /*if (mutation instanceof Delete) {
 +                valueMap.clear();
 +            } else */ {
 +                for (Cell cell : pendingUpdates) {
 +                    if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() 
|| cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
 +                        ColumnReference ref = new 
ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
 +                        valueMap.remove(ref);
 +                    } else if (cell.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
 +                        for (ColumnReference ref : indexedColumns) {
 +                            if (ref.matchesFamily(cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength())) {
 +                                valueMap.remove(ref);
 +                            }
 +                        }
 +                    } else if (cell.getTypeByte() == 
KeyValue.Type.Put.getCode()){
 +                        ColumnReference ref = new 
ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength());
 +                        if (indexedColumns.contains(ref)) {
 +                            ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
 +                            ptr.set(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
 +                            valueMap.put(ref, ptr);
 +                        }
 +                    } else {
 +                        throw new IllegalStateException("Unexpected mutation 
type for " + cell);
 +                    }
 +                }
 +            }
 +        }
 +        
 +        @Override
-         public Collection<Cell> getPendingUpdate() {
++        public Collection<KeyValue> getPendingUpdate() {
 +            return pendingUpdates;
 +        }
 +
 +        @Override
 +        public Pair<ValueGetter, IndexUpdate> 
getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
 +                throws IOException {
 +            // TODO: creating these objects over and over again is wasteful
 +            ColumnTracker tracker = new ColumnTracker(indexedColumns);
 +            ValueGetter getter = new ValueGetter() {
 +
 +                @Override
 +                public ImmutableBytesWritable getLatestValue(ColumnReference 
ref) throws IOException {
 +                    return valueMap.get(ref);
 +                }
 +
 +                @Override
 +                public byte[] getRowKey() {
 +                    return mutation.getRow();
 +                }
 +                
 +            };
 +            Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, 
IndexUpdate>(getter, new IndexUpdate(tracker));
 +            return pair;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 4690ccc,57e84f0..c18233e
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@@ -49,9 -51,10 +51,11 @@@ import org.apache.phoenix.compile.Query
  import org.apache.phoenix.compile.RowProjector;
  import org.apache.phoenix.compile.ScanRanges;
  import org.apache.phoenix.compile.StatementContext;
+ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+ import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
  import org.apache.phoenix.exception.SQLExceptionCode;
  import org.apache.phoenix.exception.SQLExceptionInfo;
 +import org.apache.phoenix.execute.MutationState;
  import org.apache.phoenix.filter.ColumnProjectionFilter;
  import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
  import org.apache.phoenix.parse.FilterableStatement;
@@@ -102,7 -104,7 +105,8 @@@ public abstract class BaseResultIterato
      private final byte[] physicalTableName;
      private final QueryPlan plan;
      protected final String scanId;
 +    protected final MutationState mutationState;
+     private final ParallelScanGrouper scanGrouper;
      // TODO: too much nesting here - breakup into new classes.
      private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> 
allFutures;
      
@@@ -133,61 -135,81 +137,84 @@@
          return true;
      }
      
-     public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws 
SQLException {
-         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(), plan.getStatement().getHint());
+     private static void initializeScan(QueryPlan plan, Integer perScanLimit) {
+         StatementContext context = plan.getContext();
+         TableRef tableRef = plan.getTableRef();
+         PTable table = tableRef.getTable();
+         Scan scan = context.getScan();
+ 
+         Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
+         // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix 
their row keys
+         if (context.getConnection().isDescVarLengthRowKeyUpgrade()) {
+             // We project *all* KeyValues across all column families as we 
make a pass over
+             // a physical table and we want to make sure we catch all 
KeyValues that may be
+             // dynamic or part of an updatable view.
+             familyMap.clear();
+             scan.setMaxVersions();
+             scan.setFilter(null); // Remove any filter
+             scan.setRaw(true); // Traverse (and subsequently clone) all 
KeyValues
+             // Pass over PTable so we can re-write rows according to the row 
key schema
+             scan.setAttribute(BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY, 
UngroupedAggregateRegionObserver.serialize(table));
+         } else {
+             FilterableStatement statement = plan.getStatement();
+             RowProjector projector = plan.getProjector();
+             boolean keyOnlyFilter = familyMap.isEmpty() && 
context.getWhereCoditionColumns().isEmpty();
+             if (projector.isProjectEmptyKeyValue()) {
+                 // If nothing projected into scan and we only have one column 
family, just allow everything
+                 // to be projected and use a FirstKeyOnlyFilter to skip from 
row to row. This turns out to
+                 // be quite a bit faster.
+                 // Where condition columns also will get added into familyMap
+                 // When where conditions are present, we can not add 
FirstKeyOnlyFilter at beginning.
+                 if (familyMap.isEmpty() && 
context.getWhereCoditionColumns().isEmpty()
+                         && table.getColumnFamilies().size() == 1) {
+                     // Project the one column family. We must project a 
column family since it's possible
+                     // that there are other non declared column families that 
we need to ignore.
+                     
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
+                 } else {
+                     byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+                     // Project empty key value unless the column family 
containing it has
+                     // been projected in its entirety.
+                     if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != 
null) {
+                         scan.addColumn(ecf, 
QueryConstants.EMPTY_COLUMN_BYTES);
+                     }
+                 }
+             } else if (table.getViewType() == ViewType.MAPPED) {
+                 // Since we don't have the empty key value in MAPPED tables, 
we must select all CFs in HRS. But only the
+                 // selected column values are returned back to client
+                 for (PColumnFamily family : table.getColumnFamilies()) {
+                     scan.addFamily(family.getName().getBytes());
+                 }
+             }
+             // Add FirstKeyOnlyFilter if there are no references to key value 
columns
+             if (keyOnlyFilter) {
+                 ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
+             }
+             
+             // TODO adding all CFs here is not correct. It should be done 
only after ColumnProjectionOptimization.
+             if (perScanLimit != null) {
+                 ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+             }
+     
+             doColumnProjectionOptimization(context, scan, table, statement);
+         }
+     }
+     
+     public BaseResultIterators(QueryPlan plan, Integer perScanLimit, 
ParallelScanGrouper scanGrouper) throws SQLException {
+         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
          this.plan = plan;
+         this.scanGrouper = scanGrouper;
          StatementContext context = plan.getContext();
 +        // Clone MutationState as the one on the connection will change if 
auto commit is on
 +        // yet we need the original one with the original transaction from 
TableResultIterator.
 +        this.mutationState = new 
MutationState(context.getConnection().getMutationState());
          TableRef tableRef = plan.getTableRef();
          PTable table = tableRef.getTable();
-         FilterableStatement statement = plan.getStatement();
-         RowProjector projector = plan.getProjector();
          physicalTableName = table.getPhysicalName().getBytes();
          tableStats = useStats() ? new 
MetaDataClient(context.getConnection()).getTableStats(table) : 
PTableStats.EMPTY_STATS;
-         Scan scan = context.getScan();
          // Used to tie all the scans together during logging
          scanId = UUID.randomUUID().toString();
-         Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
-         boolean keyOnlyFilter = familyMap.isEmpty() && 
context.getWhereCoditionColumns().isEmpty();
-         if (projector.isProjectEmptyKeyValue()) {
-             // If nothing projected into scan and we only have one column 
family, just allow everything
-             // to be projected and use a FirstKeyOnlyFilter to skip from row 
to row. This turns out to
-             // be quite a bit faster.
-             // Where condition columns also will get added into familyMap
-             // When where conditions are present, we can not add 
FirstKeyOnlyFilter at beginning.
-             if (familyMap.isEmpty() && 
context.getWhereCoditionColumns().isEmpty()
-                     && table.getColumnFamilies().size() == 1) {
-                 // Project the one column family. We must project a column 
family since it's possible
-                 // that there are other non declared column families that we 
need to ignore.
-                 
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
-             } else {
-                 byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
-                 // Project empty key value unless the column family 
containing it has
-                 // been projected in its entirety.
-                 if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != 
null) {
-                     scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
-                 }
-             }
-         } else if (table.getViewType() == ViewType.MAPPED) {
-             // Since we don't have the empty key value in MAPPED tables, we 
must select all CFs in HRS. But only the
-             // selected column values are returned back to client
-             for (PColumnFamily family : table.getColumnFamilies()) {
-                 scan.addFamily(family.getName().getBytes());
-             }
-         }
-         // Add FirstKeyOnlyFilter if there are no references to key value 
columns
-         if (keyOnlyFilter) {
-             ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
-         }
          
-         // TODO adding all CFs here is not correct. It should be done only 
after ColumnProjectionOptimization.
-         if (perScanLimit != null) {
-             ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
-         }
- 
-         doColumnProjectionOptimization(context, scan, table, statement);
+         initializeScan(plan, perScanLimit);
          
          this.scans = getParallelScans();
          List<KeyRange> splitRanges = 
Lists.newArrayListWithExpectedSize(scans.size() * 
ESTIMATED_GUIDEPOSTS_PER_REGION);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index 488641f,f272e55..ef8c4a6
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@@ -26,7 -27,7 +27,8 @@@ import java.util.List
  import org.apache.hadoop.hbase.client.Scan;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.hadoop.hbase.util.Bytes;
+ import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.MutationState;
  import org.apache.phoenix.query.QueryServices;
  import org.apache.phoenix.query.QueryServicesOptions;
  import org.apache.phoenix.schema.TableRef;
@@@ -48,10 -49,10 +50,11 @@@ public class ChunkedResultIterator impl
  
      private final ParallelIteratorFactory delegateIteratorFactory;
      private ImmutableBytesWritable lastKey = new ImmutableBytesWritable();
+     private final StatementContext context;
      private final TableRef tableRef;
 -    private Scan scan;
      private final long chunkSize;
 +    private final MutationState mutationState;
 +    private Scan scan;
      private PeekingResultIterator resultIterator;
  
      public static class ChunkedResultIteratorFactory implements 
ParallelIteratorFactory {
@@@ -70,19 -67,19 +73,19 @@@
          }
  
          @Override
-         public PeekingResultIterator newIterator(ResultIterator scanner, Scan 
scan) throws SQLException {
-             scanner.close(); //close the iterator since we don't need it 
anymore.
+         public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName) throws SQLException {
              if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator
 over " + tableRef.getTable().getName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
-             return new ChunkedResultIterator(delegateFactory, mutationState, 
tableRef, scan, 
 -            return new ChunkedResultIterator(delegateFactory, context, 
tableRef, scan,
 -                    
context.getConnection().getQueryServices().getProps().getLong(
 -                                        QueryServices.SCAN_RESULT_CHUNK_SIZE,
 -                                        
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
++            return new ChunkedResultIterator(delegateFactory, mutationState, 
context, tableRef, scan, 
 +                    
mutationState.getConnection().getQueryServices().getProps().getLong(
 +                                QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                 
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE));
++                                
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
          }
      }
  
 -    private ChunkedResultIterator(ParallelIteratorFactory 
delegateIteratorFactory,
 -            StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize, ResultIterator scanner) throws SQLException {
 +    private ChunkedResultIterator(ParallelIteratorFactory 
delegateIteratorFactory, MutationState mutationState,
-             TableRef tableRef, Scan scan, long chunkSize) throws SQLException 
{
++              StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize, ResultIterator scanner) throws SQLException {
          this.delegateIteratorFactory = delegateIteratorFactory;
+         this.context = context;
          this.tableRef = tableRef;
          this.scan = scan;
          this.chunkSize = chunkSize;
@@@ -122,9 -118,10 +125,10 @@@
              scan = ScanUtil.newScan(scan);
              scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
              if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator 
over " + tableRef.getTable().getName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
+             String tableName = 
tableRef.getTable().getPhysicalName().getString();
              ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
-                     new TableResultIterator(mutationState, tableRef, scan), 
chunkSize);
-             resultIterator = 
delegateIteratorFactory.newIterator(singleChunkResultIterator, scan);
 -                    new TableResultIterator(context, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize);
++                    new TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize);
+             resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName);
          }
          return resultIterator;
      }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index ed81110,87f8335..265831c
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@@ -86,7 -102,7 +102,7 @@@ public class ParallelIterators extends 
                  @Override
                  public PeekingResultIterator call() throws Exception {
                      long startTime = System.currentTimeMillis();
-                     ResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan);
 -                    ResultIterator scanner = new TableResultIterator(context, 
tableRef, scan, scanMetrics);
++                    ResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, scanMetrics);
                      if (logger.isDebugEnabled()) {
                          logger.debug(LogUtil.addCustomAnnotations("Id: " + 
scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + 
scan, ScanUtil.getCustomAnnotations(scan)));
                      }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 6edb98d,fa18c83..e0936b2
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@@ -79,12 -80,8 +80,8 @@@ public class SerialIterators extends Ba
                  public PeekingResultIterator call() throws Exception {
                        List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(scans.size());
                        for (final Scan scan : scans) {
-                           long startTime = System.currentTimeMillis();
-                           ResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, ScannerCreation.DELAYED);
-                           if (logger.isDebugEnabled()) {
-                               logger.debug(LogUtil.addCustomAnnotations("Id: 
" + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: 
" + scan, ScanUtil.getCustomAnnotations(scan)));
-                           }
-                           
concatIterators.add(iteratorFactory.newIterator(scanner, scan));
 -                          ResultIterator scanner = new 
TableResultIterator(context, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
ScannerCreation.DELAYED);
++                          ResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
ScannerCreation.DELAYED);
+                           
concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, 
tableName));
                        }
                        PeekingResultIterator concatIterator = 
ConcatResultIterator.newIterator(concatIterators);
                      allIterators.add(concatIterator);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index e93d917,6f040d1..f1a2496
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@@ -22,9 -22,9 +22,12 @@@ import java.sql.SQLException
  import java.util.List;
  
  import org.apache.hadoop.hbase.client.HTableInterface;
++import org.apache.hadoop.hbase.client.Result;
++import org.apache.hadoop.hbase.client.ResultScanner;
  import org.apache.hadoop.hbase.client.Scan;
 -import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.MutationState;
 +import org.apache.phoenix.schema.PTable;
+ import org.apache.phoenix.monitoring.CombinableMetric;
  import org.apache.phoenix.schema.TableRef;
  import org.apache.phoenix.schema.tuple.Tuple;
  import org.apache.phoenix.util.Closeables;
@@@ -45,9 -45,10 +48,10 @@@ public class TableResultIterator implem
      private final Scan scan;
      private final HTableInterface htable;
      private volatile ResultIterator delegate;
- 
-     public TableResultIterator(MutationState mutationState, Scan scan, 
TableRef tableRef) throws SQLException {
-         this(mutationState, tableRef, scan);
+     private final CombinableMetric scanMetrics;
+     
 -    public TableResultIterator(StatementContext context, TableRef tableRef, 
CombinableMetric scanMetrics) throws SQLException {
 -        this(context, tableRef, context.getScan(), scanMetrics);
++    public TableResultIterator(MutationState mutationState, Scan scan, 
TableRef tableRef, CombinableMetric scanMetrics) throws SQLException {
++        this(mutationState, tableRef, scan, scanMetrics);
      }
  
      /*
@@@ -63,7 -64,7 +67,12 @@@
                  delegate = this.delegate;
                  if (delegate == null) {
                      try {
-                         this.delegate = delegate = isClosing ? 
ResultIterator.EMPTY_ITERATOR : new 
ScanningResultIterator(htable.getScanner(scan));
++                      ResultScanner resultScanner = htable.getScanner(scan);
++                      Result result = null;
++                      while ( (result = resultScanner.next())!=null ) {
++                              System.out.println(result);
++                        }
+                         this.delegate = delegate = isClosing ? 
ResultIterator.EMPTY_ITERATOR : new 
ScanningResultIterator(htable.getScanner(scan), scanMetrics);
                      } catch (IOException e) {
                          Closeables.closeQuietly(htable);
                          throw ServerUtil.parseServerException(e);
@@@ -74,14 -75,15 +83,15 @@@
          return delegate;
      }
      
-     public TableResultIterator(MutationState mutationState, TableRef 
tableRef, Scan scan) throws SQLException {
-         this(mutationState, tableRef, scan, ScannerCreation.IMMEDIATE);
 -    public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics) throws SQLException {
 -        this(context, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE);
++    public TableResultIterator(MutationState mutationState, TableRef 
tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException {
++        this(mutationState, tableRef, scan, scanMetrics, 
ScannerCreation.IMMEDIATE);
      }
  
-     public TableResultIterator(MutationState mutationState, TableRef 
tableRef, Scan scan, ScannerCreation creationMode) throws SQLException {
 -    public TableResultIterator(StatementContext context, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws 
SQLException {
 -        super(context, tableRef);
++    public TableResultIterator(MutationState mutationState, TableRef 
tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation 
creationMode) throws SQLException {
          this.scan = scan;
+         this.scanMetrics = scanMetrics;
 -        htable = 
context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes());
 +        PTable table = tableRef.getTable();
-         this.htable = mutationState.getHTable(table);
++        htable = mutationState.getHTable(table);
          if (creationMode == ScannerCreation.IMMEDIATE) {
                getDelegate(false);
          }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index e1c8dea,6985fc3..153a76e
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@@ -54,9 -53,10 +53,12 @@@ import java.util.concurrent.Executor
  
  import javax.annotation.Nullable;
  
 +import co.cask.tephra.Transaction;
 +
  import org.apache.hadoop.hbase.HConstants;
+ import org.apache.hadoop.hbase.client.Consistency;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.TraceScope;
  import org.apache.phoenix.call.CallRunner;
  import org.apache.phoenix.exception.SQLExceptionCode;
  import org.apache.phoenix.exception.SQLExceptionInfo;
@@@ -154,28 -155,35 +161,39 @@@ public class PhoenixConnection implemen
          return props;
      }
  
-     public PhoenixConnection(PhoenixConnection connection) throws 
SQLException {
-         this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState());
+     public PhoenixConnection(PhoenixConnection connection, boolean 
isDescRowKeyOrderUpgrade) throws SQLException {
 -        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.getMetaDataCache(), 
isDescRowKeyOrderUpgrade);
++        this(connection.getQueryServices(), connection.getURL(), 
connection.getClientInfo(), connection.metaData, connection.getMutationState(), 
isDescRowKeyOrderUpgrade);
          this.isAutoCommit = connection.isAutoCommit;
          this.sampler = connection.sampler;
+         this.statementExecutionCounter = connection.statementExecutionCounter;
+     }
+ 
+     public PhoenixConnection(PhoenixConnection connection) throws 
SQLException {
+         this(connection, connection.isDescVarLengthRowKeyUpgrade);
+     }
+     
+     public PhoenixConnection(PhoenixConnection connection, long scn) throws 
SQLException {
+         this(connection.getQueryServices(), connection, scn);
      }
      
      public PhoenixConnection(ConnectionQueryServices services, 
PhoenixConnection connection, long scn) throws SQLException {
-         this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState());
 -        this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), 
connection.isDescVarLengthRowKeyUpgrade());
++        this(services, connection.getURL(), 
newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, 
connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade());
          this.isAutoCommit = connection.isAutoCommit;
          this.sampler = connection.sampler;
+         this.statementExecutionCounter = connection.statementExecutionCounter;
      }
      
      public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData) throws SQLException {
-         this(services, url, info, metaData, null);
 -        this(services, url, info, metaData, false);
++        this(services, url, info, metaData, null, false);
      }
 -
 -    public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, boolean isDescVarLengthRowKeyUpgrade) 
throws SQLException {
 +    
 +    public PhoenixConnection(PhoenixConnection connection, 
ConnectionQueryServices services, Properties info) throws SQLException {
-         this(services, connection.url, info, connection.metaData, null);
++        this(services, connection.url, info, connection.metaData, null, 
connection.isDescVarLengthRowKeyUpgrade());
 +    }
 +    
-     public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, MutationState mutationState) throws 
SQLException {
++    public PhoenixConnection(ConnectionQueryServices services, String url, 
Properties info, PMetaData metaData, MutationState mutationState, boolean 
isDescVarLengthRowKeyUpgrade) throws SQLException {
          this.url = url;
+         this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
          // Copy so client cannot change
          this.info = info == null ? new Properties() : 
PropertiesUtil.deepCopy(info);
          final PName tenantId = JDBCUtil.getTenantId(url, info);
@@@ -243,8 -245,17 +255,17 @@@
                           ! Objects.equal(tenantId, table.getTenantId())) );
              }
              
-         });
-         this.mutationState = mutationState == null ? new 
MutationState(maxSize, this) : new MutationState(mutationState);
+             @Override
+             public boolean prune(PFunction function) {
+                 long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP 
: scn;
+                 return ( function.getTimeStamp() >= maxTimestamp ||
+                          ! Objects.equal(tenantId, function.getTenantId()));
+             }
+         };
+         this.isRequestLevelMetricsEnabled = 
JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, 
this.services.getProps());
 -        this.mutationState = newMutationState(maxSize);
++        this.mutationState = mutationState == null ? 
newMutationState(maxSize) : new MutationState(mutationState);
+         this.metaData = metaData.pruneTables(pruner);
+         this.metaData = metaData.pruneFunctions(pruner);
          this.services.addConnection(this);
  
          // setup tracing, if its enabled
@@@ -372,14 -406,14 +416,22 @@@
          return mutateBatchSize;
      }
      
+     public PMetaData getMetaDataCache() {
+         return metaData;
+     }
++    
 +    public PTable getTable(PTableKey key) throws TableNotFoundException {
 +      return metaData.getTableRef(key).getTable();
 +    }
 +    
 +    public PTableRef getTableRef(PTableKey key) throws TableNotFoundException 
{
 +      return metaData.getTableRef(key);
 +    }
  
+     protected MutationState newMutationState(int maxSize) {
+         return new MutationState(maxSize, this); 
+     }
+     
      public MutationState getMutationState() {
          return mutationState;
      }
@@@ -647,13 -687,8 +705,14 @@@
  
      @Override
      public void rollback() throws SQLException {
 -        mutationState.rollback(this);
 +        CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() 
{
 +            @Override
 +            public Void call() throws SQLException {
 +                mutationState.rollback();
 +                return null;
 +            }
 +        }, Tracing.withTracing(this, "rolling back"));
+         statementExecutionCounter = 0;
      }
  
      @Override
@@@ -758,29 -800,37 +824,41 @@@
          // TODO Auto-generated method stub
          return 0;
      }
 -
 +    
      @Override
 -    public PMetaData addTable(PTable table) throws SQLException {
 -        // TODO: since a connection is only used by one thread at a time,
 -        // we could modify this metadata in place since it's not shared.
 -        if (scn == null || scn > table.getTimeStamp()) {
 -            metaData = metaData.addTable(table);
 -        }
 +    public PMetaData addTable(PTable table, long resolvedTime) throws 
SQLException {
 +        metaData = metaData.addTable(table, resolvedTime);
          //Cascade through to connectionQueryServices too
 -        getQueryServices().addTable(table);
 +        getQueryServices().addTable(table, resolvedTime);
 +        return metaData;
 +    }
 +    
 +    @Override
 +    public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) 
throws SQLException {
 +      metaData = metaData.updateResolvedTimestamp(table, resolvedTime);
 +      //Cascade through to connectionQueryServices too
 +        getQueryServices().updateResolvedTimestamp(table, resolvedTime);
          return metaData;
      }
+     
+     @Override
+     public PMetaData addFunction(PFunction function) throws SQLException {
+         // TODO: since a connection is only used by one thread at a time,
+         // we could modify this metadata in place since it's not shared.
+         if (scn == null || scn > function.getTimeStamp()) {
+             metaData = metaData.addFunction(function);
+         }
+         //Cascade through to connectionQueryServices too
+         getQueryServices().addFunction(function);
+         return metaData;
+     }
  
      @Override
 -    public PMetaData addColumn(PName tenantId, String tableName, 
List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean 
isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean 
storeNulls)
 +    public PMetaData addColumn(PName tenantId, String tableName, 
List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean 
isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean 
storeNulls, long resolvedTime)
              throws SQLException {
 -        metaData = metaData.addColumn(tenantId, tableName, columns, 
tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, 
storeNulls);
 +        metaData = metaData.addColumn(tenantId, tableName, columns, 
tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, 
storeNulls, resolvedTime);
          //Cascade through to connectionQueryServices too
 -        getQueryServices().addColumn(tenantId, tableName, columns, 
tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, 
storeNulls);
 +        getQueryServices().addColumn(tenantId, tableName, columns, 
tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, 
storeNulls, resolvedTime);
          return metaData;
      }
  
@@@ -793,11 -843,19 +871,19 @@@
      }
  
      @Override
+     public PMetaData removeFunction(PName tenantId, String functionName, long 
tableTimeStamp) throws SQLException {
+         metaData = metaData.removeFunction(tenantId, functionName, 
tableTimeStamp);
+         //Cascade through to connectionQueryServices too
+         getQueryServices().removeFunction(tenantId, functionName, 
tableTimeStamp);
+         return metaData;
+     }
+ 
+     @Override
      public PMetaData removeColumn(PName tenantId, String tableName, 
List<PColumn> columnsToRemove, long tableTimeStamp,
 -            long tableSeqNum) throws SQLException {
 -        metaData = metaData.removeColumn(tenantId, tableName, 
columnsToRemove, tableTimeStamp, tableSeqNum);
 +            long tableSeqNum, long resolvedTime) throws SQLException {
 +        metaData = metaData.removeColumn(tenantId, tableName, 
columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime);
          //Cascade through to connectionQueryServices too
 -        getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, 
tableTimeStamp, tableSeqNum);
 +        getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, 
tableTimeStamp, tableSeqNum, resolvedTime);
          return metaData;
      }
  

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 6376de7,19b34e6..55ae8b3
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@@ -33,16 -34,19 +34,22 @@@ import java.text.Format
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
 +import java.util.Iterator;
  import java.util.List;
+ import java.util.Map;
 +import java.util.Set;
  
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.client.Consistency;
  import org.apache.hadoop.hbase.client.Scan;
  import org.apache.hadoop.hbase.util.Pair;
  import org.apache.phoenix.call.CallRunner;
 +import org.apache.phoenix.compile.BaseMutationPlan;
  import org.apache.phoenix.compile.ColumnProjector;
  import org.apache.phoenix.compile.ColumnResolver;
+ import org.apache.phoenix.compile.CreateFunctionCompiler;
  import org.apache.phoenix.compile.CreateIndexCompiler;
  import org.apache.phoenix.compile.CreateSequenceCompiler;
  import org.apache.phoenix.compile.CreateTableCompiler;
@@@ -236,14 -260,9 +263,14 @@@ public class PhoenixStatement implement
                      public PhoenixResultSet call() throws SQLException {
                      final long startTime = System.currentTimeMillis();
                      try {
-                         QueryPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
 -                        QueryPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
 -                        plan = 
connection.getQueryServices().getOptimizer().optimize(
 -                                PhoenixStatement.this, plan);
++                                              QueryPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
 +                        // Send mutations to hbase, so they are visible to 
subsequent reads.
 +                        // Use original plan for data table so that data and 
immutable indexes will be sent
 +                        // TODO: for joins, we need to iterate through all 
tables, but we need the original table,
 +                        // not the projected table, so 
plan.getContext().getResolver().getTables() won't work.
 +                        Iterator<TableRef> tableRefs = 
plan.getSourceRefs().iterator();
 +                        
connection.getMutationState().sendUncommitted(tableRefs);
 +                        plan = 
connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, 
plan);
                           // this will create its own trace internally, so we 
don't wrap this
                           // whole thing in tracing
                          ResultIterator resultIterator = plan.iterator();
@@@ -257,10 -278,7 +286,11 @@@
                          setLastResultSet(rs);
                          setLastUpdateCount(NO_UPDATE);
                          setLastUpdateOperation(stmt.getOperation());
 +                        // If transactional, this will move the read pointer 
forward
 +                        if (connection.getAutoCommit()) {
 +                            connection.commit();
 +                        }
+                         connection.incrementStatementExecutionCounter();
                          return rs;
                      } catch (RuntimeException e) {
                          // FIXME: Expression.evaluate does not throw 
SQLException
@@@ -296,17 -314,13 +326,17 @@@
                          new CallRunner.CallableThrowable<Integer, 
SQLException>() {
                          @Override
                              public Integer call() throws SQLException {
 -                            // Note that the upsert select statements will 
need to commit any open transaction here,
 -                            // since they'd update data directly from 
coprocessors, and should thus operate on
 -                            // the latest state
                              try {
 +                                MutationState state = 
connection.getMutationState();
-                                 MutationPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
+                                 MutationPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
 -                                MutationState state = plan.execute();
 -                                connection.getMutationState().join(state);
 +                                if (plan.getTargetRef() != null && 
plan.getTargetRef().getTable() != null && 
plan.getTargetRef().getTable().isTransactional()) {
 +                                    state.startTransaction();
 +                                }
 +                                Iterator<TableRef> tableRefs = 
plan.getSourceRefs().iterator();
 +                                state.sendUncommitted(tableRefs);
 +                                state.checkpoint(plan);
 +                                MutationState lastState = plan.execute();
 +                                state.join(lastState);
                                  if (connection.getAutoCommit()) {
                                      connection.commit();
                                  }
@@@ -314,9 -328,10 +344,10 @@@
                                  setLastQueryPlan(null);
                                  // Unfortunately, JDBC uses an int for update 
count, so we
                                  // just max out at Integer.MAX_VALUE
 -                                int lastUpdateCount = (int) 
Math.min(Integer.MAX_VALUE, state.getUpdateCount());
 +                                int lastUpdateCount = (int) 
Math.min(Integer.MAX_VALUE, lastState.getUpdateCount());
                                  setLastUpdateCount(lastUpdateCount);
                                  setLastUpdateOperation(stmt.getOperation());
+                                 
connection.incrementStatementExecutionCounter();
                                  return lastUpdateCount;
                              } catch (RuntimeException e) {
                                  // FIXME: Expression.evaluate does not throw 
SQLException
@@@ -504,10 -529,10 +550,14 @@@
                      return true;
                  }
  
 +                              @Override
 +                              public Operation getOperation() {
 +                                      return this.getOperation();
 +                              }
+                 @Override
+                 public boolean useRoundRobinIterator() throws SQLException {
+                     return false;
+                 }
                  
              };
          }
@@@ -521,7 -546,10 +571,10 @@@
          @SuppressWarnings("unchecked")
          @Override
          public MutationPlan compilePlan(PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
-             UpsertCompiler compiler = new UpsertCompiler(stmt, 
this.getOperation());
+             if(!getUdfParseNodes().isEmpty()) {
+                 stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
+             }
 -            UpsertCompiler compiler = new UpsertCompiler(stmt);
++                      UpsertCompiler compiler = new UpsertCompiler(stmt, 
this.getOperation());
              MutationPlan plan = compiler.compile(this);
              
plan.getContext().getSequenceManager().validateSequences(seqAction);
              return plan;
@@@ -536,7 -564,10 +589,10 @@@
          @SuppressWarnings("unchecked")
          @Override
          public MutationPlan compilePlan(PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
-             DeleteCompiler compiler = new DeleteCompiler(stmt, 
this.getOperation());
+             if(!getUdfParseNodes().isEmpty()) {
+                 stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
+             }
 -            DeleteCompiler compiler = new DeleteCompiler(stmt);
++                  DeleteCompiler compiler = new DeleteCompiler(stmt, 
this.getOperation());
              MutationPlan plan = compiler.compile(this);
              
plan.getContext().getSequenceManager().validateSequences(seqAction);
              return plan;
@@@ -558,6 -589,209 +614,179 @@@
          }
      }
  
+     private static class ExecutableCreateFunctionStatement extends 
CreateFunctionStatement implements CompilableStatement {
+ 
+         public ExecutableCreateFunctionStatement(PFunction functionInfo, 
boolean temporary, boolean isReplace) {
+             super(functionInfo, temporary, isReplace);
+         }
+ 
+ 
+         @SuppressWarnings("unchecked")
+         @Override
+         public MutationPlan compilePlan(final PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
+             stmt.throwIfUnallowedUserDefinedFunctions(Collections.EMPTY_MAP);
+             CreateFunctionCompiler compiler = new 
CreateFunctionCompiler(stmt);
+             return compiler.compile(this);
+         }
+     }
+ 
+     private static class ExecutableDropFunctionStatement extends 
DropFunctionStatement implements CompilableStatement {
+ 
+         public ExecutableDropFunctionStatement(String functionName, boolean 
ifNotExists) {
+             super(functionName, ifNotExists);
+         }
+ 
+         @SuppressWarnings("unchecked")
+         @Override
+         public MutationPlan compilePlan(final PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
+                 final StatementContext context = new StatementContext(stmt);
 -                return new MutationPlan() {
 -
 -                    @Override
 -                    public StatementContext getContext() {
 -                        return context;
 -                    }
++                return new BaseMutationPlan(context, this.getOperation()) {
+ 
+                     @Override
+                     public ParameterMetaData getParameterMetaData() {
+                         return 
PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                     }
+ 
+                     @Override
+                     public ExplainPlan getExplainPlan() throws SQLException {
 -                        return new 
ExplainPlan(Collections.singletonList("DROP TABLE"));
 -                    }
 -
 -                    @Override
 -                    public PhoenixConnection getConnection() {
 -                        return stmt.getConnection();
++                        return new 
ExplainPlan(Collections.singletonList("DROP FUNCTION"));
+                     }
+ 
+                     @Override
+                     public MutationState execute() throws SQLException {
 -                        MetaDataClient client = new 
MetaDataClient(getConnection());
++                        MetaDataClient client = new 
MetaDataClient(getContext().getConnection());
+                         return 
client.dropFunction(ExecutableDropFunctionStatement.this);
+                     }
+                 };
+         }
+     }
+     
+     private static class ExecutableAddJarsStatement extends AddJarsStatement 
implements CompilableStatement {
+ 
+         public ExecutableAddJarsStatement(List<LiteralParseNode> jarPaths) {
+             super(jarPaths);
+         }
+ 
+ 
+         @SuppressWarnings("unchecked")
+         @Override
+         public MutationPlan compilePlan(final PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
+             final StatementContext context = new StatementContext(stmt);
 -            return new MutationPlan() {
 -
 -                @Override
 -                public StatementContext getContext() {
 -                    return context;
 -                }
++            return new BaseMutationPlan(context, this.getOperation()) {
+ 
+                 @Override
+                 public ParameterMetaData getParameterMetaData() {
+                     return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                 }
+ 
+                 @Override
+                 public ExplainPlan getExplainPlan() throws SQLException {
+                     return new ExplainPlan(Collections.singletonList("ADD 
JARS"));
+                 }
+ 
+                 @Override
 -                public PhoenixConnection getConnection() {
 -                    return stmt.getConnection();
 -                }
 -
 -                @Override
+                 public MutationState execute() throws SQLException {
+                     String dynamicJarsDir = 
stmt.getConnection().getQueryServices().getProps().get(QueryServices.DYNAMIC_JARS_DIR_KEY);
+                     if(dynamicJarsDir == null) {
+                         throw new 
SQLException(QueryServices.DYNAMIC_JARS_DIR_KEY+" is not configured for placing 
the jars.");
+                     }
+                     dynamicJarsDir =
+                             dynamicJarsDir.endsWith("/") ? dynamicJarsDir : 
dynamicJarsDir + '/';
+                     Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+                     Path dynamicJarsDirPath = new Path(dynamicJarsDir);
+                     for (LiteralParseNode jarPath : getJarPaths()) {
+                         String jarPathStr = (String)jarPath.getValue();
+                         if(!jarPathStr.endsWith(".jar")) {
+                             throw new SQLException(jarPathStr + " is not a 
valid jar file path.");
+                         }
+                     }
+ 
+                     try {
+                         FileSystem fs = 
dynamicJarsDirPath.getFileSystem(conf);
+                         List<LiteralParseNode> jarPaths = getJarPaths();
+                         for (LiteralParseNode jarPath : jarPaths) {
+                             File f = new File((String) jarPath.getValue());
+                             fs.copyFromLocalFile(new 
Path(f.getAbsolutePath()), new Path(
+                                     dynamicJarsDir + f.getName()));
+                         }
+                     } catch(IOException e) {
+                         throw new SQLException(e);
+                     }
+                     return new MutationState(0, context.getConnection());
+                 }
+             };
+             
+         }
+     }
+ 
+     private static class ExecutableDeleteJarStatement extends 
DeleteJarStatement implements CompilableStatement {
+ 
+         public ExecutableDeleteJarStatement(LiteralParseNode jarPath) {
+             super(jarPath);
+         }
+ 
+ 
+         @SuppressWarnings("unchecked")
+         @Override
+         public MutationPlan compilePlan(final PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
+             final StatementContext context = new StatementContext(stmt);
 -            return new MutationPlan() {
 -
 -                @Override
 -                public StatementContext getContext() {
 -                    return context;
 -                }
++            return new BaseMutationPlan(context, this.getOperation()) {
+ 
+                 @Override
+                 public ParameterMetaData getParameterMetaData() {
+                     return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                 }
+ 
+                 @Override
+                 public ExplainPlan getExplainPlan() throws SQLException {
+                     return new ExplainPlan(Collections.singletonList("DELETE 
JAR"));
+                 }
+ 
+                 @Override
 -                public PhoenixConnection getConnection() {
 -                    return stmt.getConnection();
 -                }
 -
 -                @Override
+                 public MutationState execute() throws SQLException {
+                     String dynamicJarsDir = 
stmt.getConnection().getQueryServices().getProps().get(QueryServices.DYNAMIC_JARS_DIR_KEY);
+                     if (dynamicJarsDir == null) {
+                         throw new 
SQLException(QueryServices.DYNAMIC_JARS_DIR_KEY
+                                 + " is not configured.");
+                     }
+                     dynamicJarsDir =
+                             dynamicJarsDir.endsWith("/") ? dynamicJarsDir : 
dynamicJarsDir + '/';
+                     Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+                     Path dynamicJarsDirPath = new Path(dynamicJarsDir);
+                     try {
+                         FileSystem fs = 
dynamicJarsDirPath.getFileSystem(conf);
+                         String jarPathStr = (String)getJarPath().getValue();
+                         if(!jarPathStr.endsWith(".jar")) {
+                             throw new SQLException(jarPathStr + " is not a 
valid jar file path.");
+                         }
+                         Path p = new Path(jarPathStr);
+                         if(fs.exists(p)) {
+                             fs.delete(p, false);
+                         }
+                     } catch(IOException e) {
+                         throw new SQLException(e);
+                     }
+                     return new MutationState(0, context.getConnection());
+                 }
+             };
+             
+         }
+     }
+ 
+     private static class ExecutableListJarsStatement extends 
ListJarsStatement implements CompilableStatement {
+ 
+         public ExecutableListJarsStatement() {
+             super();
+         }
+ 
+ 
+         @SuppressWarnings("unchecked")
+         @Override
+         public QueryPlan compilePlan(final PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
+             return new ListJarsQueryPlan(stmt);
+         }
+     }
+ 
      private static class ExecutableCreateIndexStatement extends 
CreateIndexStatement implements CompilableStatement {
  
          public ExecutableCreateIndexStatement(NamedNode indexName, 
NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> 
includeColumns, List<ParseNode> splits,
@@@ -568,7 -802,10 +797,10 @@@
          @SuppressWarnings("unchecked")
          @Override
          public MutationPlan compilePlan(PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
-             CreateIndexCompiler compiler = new CreateIndexCompiler(stmt, 
this.getOperation());
+             if(!getUdfParseNodes().isEmpty()) {
+                 stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
+             }
 -            CreateIndexCompiler compiler = new CreateIndexCompiler(stmt);
++                      CreateIndexCompiler compiler = new 
CreateIndexCompiler(stmt, this.getOperation());
              return compiler.compile(this);
          }
      }
@@@ -695,10 -978,58 +927,54 @@@
          }
      }
  
+     private static class ExecutableAlterSessionStatement extends 
AlterSessionStatement implements CompilableStatement {
+ 
+         public ExecutableAlterSessionStatement(Map<String,Object> props) {
+             super(props);
+         }
+ 
+         @SuppressWarnings("unchecked")
+         @Override
+         public MutationPlan compilePlan(final PhoenixStatement stmt, 
Sequence.ValueOp seqAction) throws SQLException {
+             final StatementContext context = new StatementContext(stmt);
 -            return new MutationPlan() {
++            return new BaseMutationPlan(context, this.getOperation()) {
+ 
+                 @Override
+                 public StatementContext getContext() {
+                     return context;
+                 }
+ 
+                 @Override
+                 public ParameterMetaData getParameterMetaData() {
+                     return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                 }
+ 
+                 @Override
+                 public ExplainPlan getExplainPlan() throws SQLException {
+                     return new ExplainPlan(Collections.singletonList("ALTER 
SESSION"));
+                 }
+ 
 -                @Override
 -                public PhoenixConnection getConnection() {
 -                    return stmt.getConnection();
 -                }
+ 
+                 @Override
+                 public MutationState execute() throws SQLException {
+                     Object consistency = 
getProps().get(PhoenixRuntime.CONSISTENCY_ATTRIB.toUpperCase());
+                     if(consistency != null) {
+                         if 
(((String)consistency).equalsIgnoreCase(Consistency.TIMELINE.toString())){
 -                            
getConnection().setConsistency(Consistency.TIMELINE);
++                            
getContext().getConnection().setConsistency(Consistency.TIMELINE);
+                         } else {
 -                            
getConnection().setConsistency(Consistency.STRONG);
++                              
getContext().getConnection().setConsistency(Consistency.STRONG);
+                         }
+                     }
+                     return new MutationState(0, context.getConnection());
+                 }
+             };
+         }
+     }
+ 
      private static class ExecutableUpdateStatisticsStatement extends 
UpdateStatisticsStatement implements
              CompilableStatement {
-         public ExecutableUpdateStatisticsStatement(NamedTableNode table, 
StatisticsCollectionScope scope) {
-             super(table, scope);
+         public ExecutableUpdateStatisticsStatement(NamedTableNode table, 
StatisticsCollectionScope scope, Map<String,Object> props) {
+             super(table, scope, props);
          }
  
          @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 5500ee8,b500a25..d2f91b3
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@@ -99,8 -104,12 +104,12 @@@ public class PhoenixRecordReader<T exte
          final List<Scan> scans = pSplit.getScans();
          try {
              List<PeekingResultIterator> iterators = 
Lists.newArrayListWithExpectedSize(scans.size());
+             StatementContext ctx = queryPlan.getContext();
+             ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+             String tableName = 
queryPlan.getTableRef().getTable().getPhysicalName().getString();
              for (Scan scan : scans) {
-                 final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), 
queryPlan.getTableRef(),scan);
 -                final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext(),
++                final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext().getConnection().getMutationState(),
+                         queryPlan.getTableRef(), scan, 
readMetrics.allotMetric(SCAN_BYTES, tableName));
                  PeekingResultIterator peekingResultIterator = 
LookAheadResultIterator.wrap(tableResultIterator);
                  iterators.add(peekingResultIterator);
              }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index 0000000,32b66f1..3bc3808
mode 000000,100644..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@@ -1,0 -1,182 +1,182 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.mapreduce.index;
+ 
+ import java.io.IOException;
+ import java.sql.Connection;
+ import java.sql.PreparedStatement;
+ import java.sql.SQLException;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Properties;
+ import java.util.UUID;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hbase.client.Mutation;
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.hbase.util.Pair;
+ import org.apache.hadoop.io.IntWritable;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.mapreduce.Mapper;
+ import org.apache.phoenix.execute.MutationState;
+ import org.apache.phoenix.jdbc.PhoenixConnection;
+ import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+ import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+ import org.apache.phoenix.query.ConnectionQueryServices;
+ import org.apache.phoenix.query.QueryServices;
+ import org.apache.phoenix.query.QueryServicesOptions;
+ import org.apache.phoenix.util.ColumnInfo;
+ import org.apache.phoenix.util.PhoenixRuntime;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Mapper that hands over rows from data table to the index table.
+  */
+ public class PhoenixIndexImportDirectMapper extends
+         Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, 
IntWritable> {
+ 
+     private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixIndexImportDirectMapper.class);
+ 
+     private final PhoenixIndexDBWritable indxWritable = new 
PhoenixIndexDBWritable();
+ 
+     private List<ColumnInfo> indxTblColumnMetadata;
+ 
+     private Connection connection;
+ 
+     private PreparedStatement pStatement;
+ 
+     private DirectHTableWriter writer;
+ 
+     private int batchSize;
+ 
+     private MutationState mutationState;
+ 
+     @Override
+     protected void setup(final Context context) throws IOException, 
InterruptedException {
+         super.setup(context);
+         final Configuration configuration = context.getConfiguration();
+         writer = new DirectHTableWriter(configuration);
+ 
+         try {
+             indxTblColumnMetadata =
+                     
PhoenixConfigurationUtil.getUpsertColumnMetadataList(configuration);
+             indxWritable.setColumnMetadata(indxTblColumnMetadata);
+ 
+             final Properties overrideProps = new Properties();
+             overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
+                 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+             connection = ConnectionUtil.getOutputConnection(configuration, 
overrideProps);
+             connection.setAutoCommit(false);
+             // Get BatchSize
+             ConnectionQueryServices services = ((PhoenixConnection) 
connection).getQueryServices();
+             int maxSize =
+                     
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+                         QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+             batchSize = Math.min(((PhoenixConnection) 
connection).getMutateBatchSize(), maxSize);
+             LOG.info("Mutation Batch Size = " + batchSize);
+ 
+             final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
+             this.pStatement = connection.prepareStatement(upsertQuery);
+ 
+         } catch (SQLException e) {
+             throw new RuntimeException(e.getMessage());
+         }
+     }
+ 
+     @Override
+     protected void map(NullWritable key, PhoenixIndexDBWritable record, 
Context context)
+             throws IOException, InterruptedException {
+ 
+         context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+ 
+         try {
+             final List<Object> values = record.getValues();
+             indxWritable.setValues(values);
+             indxWritable.write(this.pStatement);
+             this.pStatement.execute();
+ 
+             final PhoenixConnection pconn = 
connection.unwrap(PhoenixConnection.class);
+             MutationState currentMutationState = pconn.getMutationState();
+             if (mutationState == null) {
+                 mutationState = currentMutationState;
+                 return;
+             }
+             // Keep accumulating Mutations till batch size
+             mutationState.join(currentMutationState);
+ 
+             // Write Mutation Batch
+             if 
(context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 
0) {
+                 writeBatch(mutationState, context);
+                 mutationState = null;
+             }
+ 
+             // Make sure progress is reported to Application Master.
+             context.progress();
+         } catch (SQLException e) {
+             LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
+             
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     private void writeBatch(MutationState mutationState, Context context) 
throws IOException,
+             SQLException, InterruptedException {
 -        final Iterator<Pair<byte[], List<Mutation>>> iterator = 
mutationState.toMutations(true);
++        final Iterator<Pair<byte[], List<Mutation>>> iterator = 
mutationState.toMutations(true, null);
+         while (iterator.hasNext()) {
+             Pair<byte[], List<Mutation>> mutationPair = iterator.next();
+ 
+             writer.write(mutationPair.getSecond());
+             context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(
+                 mutationPair.getSecond().size());
+         }
+         connection.rollback();
+     }
+ 
+     @Override
+     protected void cleanup(Context context) throws IOException, 
InterruptedException {
+         try {
+             // Write the last & final Mutation Batch
+             if (mutationState != null) {
+                 writeBatch(mutationState, context);
+             }
+             // We are writing some dummy key-value as map output here so that 
we commit only one
+             // output to reducer.
+             context.write(new 
ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+                 new IntWritable(0));
+             super.cleanup(context);
+         } catch (SQLException e) {
+             LOG.error(" Error {}  while read/write of a record ", 
e.getMessage());
+             
context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+             throw new RuntimeException(e);
+         } finally {
+             if (connection != null) {
+                 try {
+                     connection.close();
+                 } catch (SQLException e) {
+                     LOG.error("Error {} while closing connection in the 
PhoenixIndexMapper class ",
+                         e.getMessage());
+                 }
+             }
+             if (writer != null) {
+                 writer.close();
+             }
+         }
+     }
+ }

Reply via email to