http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 2da5771,e4bc193..c4ed7a0
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@@ -34,8 -42,7 +34,10 @@@ import org.apache.phoenix.hbase.index.c
  import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
  import org.apache.phoenix.hbase.index.scanner.Scanner;
  import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
 +import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 +
++import com.google.inject.Key;
+ 
  /**
   * Manage the state of the HRegion's view of the table, for the single row.
   * <p>
@@@ -46,230 -55,190 +48,230 @@@
   */
  public class LocalTableState implements TableState {
  
 -  private long ts;
 -  private RegionCoprocessorEnvironment env;
 -  private KeyValueStore memstore;
 -  private LocalHBaseState table;
 -  private Mutation update;
 -  private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
 -  private ScannerBuilder scannerBuilder;
 -  private List<KeyValue> kvs = new ArrayList<KeyValue>();
 -  private List<? extends IndexedColumnGroup> hints;
 -  private CoveredColumns columnSet;
 -
 -  public LocalTableState(RegionCoprocessorEnvironment environment, 
LocalHBaseState table, Mutation update) {
 -    this.env = environment;
 -    this.table = table;
 -    this.update = update;
 -    this.memstore = new IndexMemStore();
 -    this.scannerBuilder = new ScannerBuilder(memstore, update);
 -    this.columnSet = new CoveredColumns();
 -  }
 -
 -  public void addPendingUpdates(KeyValue... kvs) {
 -    if (kvs == null) return;
 -    addPendingUpdates(Arrays.asList(kvs));
 -  }
 -
 -  public void addPendingUpdates(List<KeyValue> kvs) {
 -    if(kvs == null) return;
 -    setPendingUpdates(kvs);
 -    addUpdate(kvs);
 -  }
 -
 -  private void addUpdate(List<KeyValue> list) {
 -    addUpdate(list, true);
 -  }
 -
 -  private void addUpdate(List<KeyValue> list, boolean overwrite) {
 -    if (list == null) return;
 -    for (KeyValue kv : list) {
 -      this.memstore.add(kv, overwrite);
 -    }
 -  }
 -
 -  @Override
 -  public RegionCoprocessorEnvironment getEnvironment() {
 -    return this.env;
 -  }
 -
 -  @Override
 -  public long getCurrentTimestamp() {
 -    return this.ts;
 -  }
 -
 -  @Override
 -  public void setCurrentTimestamp(long timestamp) {
 -    this.ts = timestamp;
 -  }
 -
 -  public void resetTrackedColumns() {
 -    this.trackedColumns.clear();
 -  }
 -
 -  public Set<ColumnTracker> getTrackedColumns() {
 -    return this.trackedColumns;
 -  }
 -
 -  @Override
 -  public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
 -      Collection<? extends ColumnReference> indexedColumns) throws 
IOException {
 -    ensureLocalStateInitialized(indexedColumns);
 -    // filter out things with a newer timestamp and track the column 
references to which it applies
 -    ColumnTracker tracker = new ColumnTracker(indexedColumns);
 -    synchronized (this.trackedColumns) {
 -      // we haven't seen this set of columns before, so we need to create a 
new tracker
 -      if (!this.trackedColumns.contains(tracker)) {
 -        this.trackedColumns.add(tracker);
 -      }
 -    }
 -
 -    Scanner scanner =
 -        this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, 
tracker, ts);
 -
 -    return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
 -  }
 -
 -  /**
 -   * Initialize the managed local state. Generally, this will only be called 
by
 -   * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be 
called concurrently from the outside.
 -   * Even then, there is still fairly low contention as each new Put/Delete 
will have its own table
 -   * state.
 -   */
 -  private synchronized void ensureLocalStateInitialized(
 -      Collection<? extends ColumnReference> columns) throws IOException {
 -    // check to see if we haven't initialized any columns yet
 -    Collection<? extends ColumnReference> toCover = 
this.columnSet.findNonCoveredColumns(columns);
 -    // we have all the columns loaded, so we are good to go.
 -    if (toCover.isEmpty()) {
 -      return;
 -    }
 -
 -    // add the current state of the row
 -    this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), 
false);
 -
 -    // add the covered columns to the set
 -    for (ColumnReference ref : toCover) {
 -      this.columnSet.addColumn(ref);
 -    }
 -  }
 -
 -  @Override
 -  public Map<String, byte[]> getUpdateAttributes() {
 -    return this.update.getAttributesMap();
 -  }
 -
 -  @Override
 -  public byte[] getCurrentRowKey() {
 -    return this.update.getRow();
 -  }
 -
 -  public Result getCurrentRowState() {
 -    KeyValueScanner scanner = this.memstore.getScanner();
 -    List<Cell> kvs = new ArrayList<Cell>();
 -    while (scanner.peek() != null) {
 -      try {
 -        kvs.add(scanner.next());
 -      } catch (IOException e) {
 -        // this should never happen - something has gone terribly arwy if it 
has
 -        throw new RuntimeException("Local MemStore threw IOException!");
 -      }
 -    }
 -    return Result.create(kvs);
 -  }
 -
 -  /**
 -   * Helper to add a {@link Mutation} to the values stored for the current row
 -   * @param pendingUpdate update to apply
 -   */
 -  public void addUpdateForTesting(Mutation pendingUpdate) {
 -    for (Map.Entry<byte[], List<Cell>> e : 
pendingUpdate.getFamilyCellMap().entrySet()) {
 -      List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
 -      addUpdate(edits);
 -    }
 -  }
 -
 -  /**
 -   * @param hints
 -   */
 -  public void setHints(List<? extends IndexedColumnGroup> hints) {
 -    this.hints = hints;
 -  }
 -
 -  @Override
 -  public List<? extends IndexedColumnGroup> getIndexColumnHints() {
 -    return this.hints;
 -  }
 -
 -  @Override
 -  public Collection<KeyValue> getPendingUpdate() {
 -    return this.kvs;
 -  }
 -
 -  /**
 -   * Set the {@link KeyValue}s in the update for which we are currently 
building an index update,
 -   * but don't actually apply them.
 -   * @param update pending {@link KeyValue}s
 -   */
 -  public void setPendingUpdates(Collection<KeyValue> update) {
 -    this.kvs.clear();
 -    this.kvs.addAll(update);
 -  }
 -
 -  /**
 -   * Apply the {@link KeyValue}s set in {@link 
#setPendingUpdates(Collection)}.
 -   */
 -  public void applyPendingUpdates() {
 -    this.addUpdate(kvs);
 -  }
 -
 -  /**
 -   * Rollback all the given values from the underlying state.
 -   * @param values
 -   */
 -  public void rollback(Collection<KeyValue> values) {
 -    for (KeyValue kv : values) {
 -      this.memstore.rollback(kv);
 -    }
 -  }
 +    private long ts;
 +    private RegionCoprocessorEnvironment env;
 +    private KeyValueStore memstore;
 +    private LocalHBaseState table;
 +    private Mutation update;
 +    private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
 +    private ScannerBuilder scannerBuilder;
-     private List<Cell> kvs = new ArrayList<Cell>();
++    private List<KeyValue> kvs = new ArrayList<KeyValue>();
 +    private List<? extends IndexedColumnGroup> hints;
 +    private CoveredColumns columnSet;
 +
 +    public LocalTableState(RegionCoprocessorEnvironment environment, 
LocalHBaseState table, Mutation update) {
 +        this.env = environment;
 +        this.table = table;
 +        this.update = update;
 +        this.memstore = new IndexMemStore();
 +        this.scannerBuilder = new ScannerBuilder(memstore, update);
 +        this.columnSet = new CoveredColumns();
 +    }
 +
-     public void addPendingUpdates(Cell... kvs) {
++    public void addPendingUpdates(KeyValue... kvs) {
 +        if (kvs == null) return;
 +        addPendingUpdates(Arrays.asList(kvs));
 +    }
 +
-     public void addPendingUpdates(List<Cell> kvs) {
++    public void addPendingUpdates(List<KeyValue> kvs) {
 +        if (kvs == null) return;
 +        setPendingUpdates(kvs);
 +        addUpdate(kvs);
 +    }
 +
-     private void addUpdate(List<Cell> list) {
++    private void addUpdate(List<KeyValue> list) {
 +        addUpdate(list, true);
 +    }
 +
-     private void addUpdate(List<Cell> list, boolean overwrite) {
++    private void addUpdate(List<KeyValue> list, boolean overwrite) {
 +        if (list == null) return;
-         for (Cell kv : list) {
-             this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite);
++        for (KeyValue kv : list) {
++            this.memstore.add(kv, overwrite);
 +        }
 +    }
 +
 +    @Override
 +    public RegionCoprocessorEnvironment getEnvironment() {
 +        return this.env;
 +    }
 +
 +    @Override
 +    public long getCurrentTimestamp() {
 +        return this.ts;
 +    }
 +
 +    /**
 +     * Set the current timestamp up to which the table should allow access to 
the underlying table.
 +     * This overrides the timestamp view provided by the indexer - use with 
care!
 +     * @param timestamp timestamp up to which the table should allow access.
 +     */
 +    public void setCurrentTimestamp(long timestamp) {
 +        this.ts = timestamp;
 +    }
 +
 +    public void resetTrackedColumns() {
 +        this.trackedColumns.clear();
 +    }
 +
 +    public Set<ColumnTracker> getTrackedColumns() {
 +        return this.trackedColumns;
 +    }
 +
 +    /**
 +     * Get a scanner on the columns that are needed by the index.
 +     * <p>
 +     * The returned scanner is already pre-seeked to the first {@link 
KeyValue} that matches the given
 +     * columns with a timestamp earlier than the timestamp to which the table 
is currently set (the
 +     * current state of the table for which we need to build an update).
 +     * <p>
 +     * If none of the passed columns matches any of the columns in the 
pending update (as determined
 +     * by {@link ColumnReference#matchesFamily(byte[])} and
 +     * {@link ColumnReference#matchesQualifier(byte[])}, then an empty 
scanner will be returned. This
 +     * is because it doesn't make sense to build index updates when there is 
no change in the table
 +     * state for any of the columns you are indexing.
 +     * <p>
 +     * <i>NOTE:</i> This method should <b>not</b> be used during
 +     * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the 
pending update will not yet have been
 +     * applied - you are merely attempting to cleanup the current state and 
therefore do <i>not</i>
 +     * need to track the indexed columns.
 +     * <p>
 +     * As a side-effect, we update a timestamp for the next-most-recent 
timestamp for the columns you
 +     * request - you will never see a column with the timestamp we are 
tracking, but the next oldest
 +     * timestamp for that column.
 +     * @param indexedColumns the columns to that will be indexed
 +     * @return an iterator over the columns and the {@link IndexUpdate} that 
should be passed back to
 +     *         the builder. Even if no update is necessary for the requested 
columns, you still need
 +     *         to return the {@link IndexUpdate}, just don't set the update 
for the
 +     *         {@link IndexUpdate}.
 +     * @throws IOException
 +     */
 +    public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
 +        Collection<? extends ColumnReference> indexedColumns) throws 
IOException {
 +        ensureLocalStateInitialized(indexedColumns);
 +        // filter out things with a newer timestamp and track the column 
references to which it applies
 +        ColumnTracker tracker = new ColumnTracker(indexedColumns);
 +        synchronized (this.trackedColumns) {
 +            // we haven't seen this set of columns before, so we need to 
create a new tracker
 +            if (!this.trackedColumns.contains(tracker)) {
 +                this.trackedColumns.add(tracker);
 +            }
 +        }
 +
 +        Scanner scanner = 
this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
 +
 +        return new Pair<Scanner, IndexUpdate>(scanner, new 
IndexUpdate(tracker));
 +    }
 +
 +    /**
 +     * Initialize the managed local state. Generally, this will only be 
called by
 +     * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be 
called concurrently from the outside. Even
 +     * then, there is still fairly low contention as each new Put/Delete will 
have its own table state.
 +     */
 +    private synchronized void ensureLocalStateInitialized(Collection<? 
extends ColumnReference> columns)
 +            throws IOException {
 +        // check to see if we haven't initialized any columns yet
 +        Collection<? extends ColumnReference> toCover = 
this.columnSet.findNonCoveredColumns(columns);
 +        // we have all the columns loaded, so we are good to go.
 +        if (toCover.isEmpty()) { return; }
 +
 +        // add the current state of the row
-         this.addUpdate(this.table.getCurrentRowState(update, 
toCover).listCells(), false);
++        this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), 
false);
 +
 +        // add the covered columns to the set
 +        for (ColumnReference ref : toCover) {
 +            this.columnSet.addColumn(ref);
 +        }
 +    }
 +
 +    @Override
 +    public Map<String, byte[]> getUpdateAttributes() {
 +        return this.update.getAttributesMap();
 +    }
 +
 +    @Override
 +    public byte[] getCurrentRowKey() {
 +        return this.update.getRow();
 +    }
 +
 +    public Result getCurrentRowState() {
 +        KeyValueScanner scanner = this.memstore.getScanner();
 +        List<Cell> kvs = new ArrayList<Cell>();
 +        while (scanner.peek() != null) {
 +            try {
 +                kvs.add(scanner.next());
 +            } catch (IOException e) {
 +                // this should never happen - something has gone terribly 
arwy if it has
 +                throw new RuntimeException("Local MemStore threw 
IOException!");
 +            }
 +        }
 +        return Result.create(kvs);
 +    }
 +
 +    /**
 +     * Helper to add a {@link Mutation} to the values stored for the current 
row
 +     * 
 +     * @param pendingUpdate
 +     *            update to apply
 +     */
 +    public void addUpdateForTesting(Mutation pendingUpdate) {
 +        for (Map.Entry<byte[], List<Cell>> e : 
pendingUpdate.getFamilyCellMap().entrySet()) {
-             List<Cell> edits = e.getValue();
++              List<KeyValue> edits = 
KeyValueUtil.ensureKeyValues(e.getValue());
 +            addUpdate(edits);
 +        }
 +    }
 +
 +    /**
 +     * @param hints
 +     */
 +    public void setHints(List<? extends IndexedColumnGroup> hints) {
 +        this.hints = hints;
 +    }
 +
 +    @Override
 +    public List<? extends IndexedColumnGroup> getIndexColumnHints() {
 +        return this.hints;
 +    }
 +
 +    @Override
-     public Collection<Cell> getPendingUpdate() {
++    public Collection<KeyValue> getPendingUpdate() {
 +        return this.kvs;
 +    }
 +
 +    /**
 +     * Set the {@link KeyValue}s in the update for which we are currently 
building an index update, but don't actually
 +     * apply them.
 +     * 
 +     * @param update
 +     *            pending {@link KeyValue}s
 +     */
-     public void setPendingUpdates(Collection<Cell> update) {
++    public void setPendingUpdates(Collection<KeyValue> update) {
 +        this.kvs.clear();
 +        this.kvs.addAll(update);
 +    }
 +
 +    /**
 +     * Apply the {@link KeyValue}s set in {@link 
#setPendingUpdates(Collection)}.
 +     */
 +    public void applyPendingUpdates() {
 +        this.addUpdate(kvs);
 +    }
 +
 +    /**
 +     * Rollback all the given values from the underlying state.
 +     * 
 +     * @param values
 +     */
-     public void rollback(Collection<Cell> values) {
-         for (Cell kv : values) {
-             this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv));
++    public void rollback(Collection<KeyValue> values) {
++        for (KeyValue kv : values) {
++            this.memstore.rollback(kv);
 +        }
 +    }
 +
 +    @Override
 +    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? 
extends ColumnReference> indexedColumns)
 +            throws IOException {
 +        Pair<Scanner, IndexUpdate> pair = 
getIndexedColumnsTableState(indexedColumns);
 +        ValueGetter valueGetter = 
IndexManagementUtil.createGetterFromScanner(pair.getFirst(), 
getCurrentRowKey());
 +        return new Pair<ValueGetter, IndexUpdate>(valueGetter, 
pair.getSecond());
 +    }
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 11e7d1a,0000000..27af40f
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@@ -1,404 -1,0 +1,404 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
 + * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
 + * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
 + * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
 + * governing permissions and limitations under the License.
 + */
 +package org.apache.phoenix.hbase.index.covered;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.hbase.Cell;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.KeyValueUtil;
 +import org.apache.hadoop.hbase.client.Delete;
 +import org.apache.hadoop.hbase.client.Mutation;
 +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 +import org.apache.hadoop.hbase.util.Pair;
 +import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
 +import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 +import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 +import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.primitives.Longs;
 +
 +/**
 + * Build covered indexes for phoenix updates.
 + * <p>
 + * Before any call to prePut/preDelete, the row has already been locked. This 
ensures that we don't need to do any extra
 + * synchronization in the IndexBuilder.
 + * <p>
 + * NOTE: This implementation doesn't cleanup the index when we remove a 
key-value on compaction or flush, leading to a
 + * bloated index that needs to be cleaned up by a background process.
 + */
 +public class NonTxIndexBuilder extends BaseIndexBuilder {
 +    private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class);
 +
 +    protected LocalHBaseState localTable;
 +
 +    @Override
 +    public void setup(RegionCoprocessorEnvironment env) throws IOException {
 +        super.setup(env);
 +        this.localTable = new LocalTable(env);
 +    }
 +
 +    @Override
 +    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation 
mutation, IndexMetaData indexMetaData) throws IOException {
 +      // create a state manager, so we can manage each batch
 +        LocalTableState state = new LocalTableState(env, localTable, 
mutation);
 +        // build the index updates for each group
 +        IndexUpdateManager manager = new IndexUpdateManager();
 +
 +        batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
 +
 +        if (LOG.isDebugEnabled()) {
 +            LOG.debug("Found index updates for Mutation: " + mutation + "\n" 
+ manager);
 +        }
 +
 +        return manager.toMap();
 +    }
 +
 +    /**
 +     * Split the mutation into batches based on the timestamps of each 
keyvalue. We need to check each key-value in the
 +     * update to see if it matches the others. Generally, this will be the 
case, but you can add kvs to a mutation that
 +     * don't all have the timestamp, so we need to manage everything in 
batches based on timestamp.
 +     * <p>
 +     * Adds all the updates in the {@link Mutation} to the state, as a 
side-effect.
 +     * @param state
 +     *            current state of the row for the mutation.
 +     * @param m
 +     *            mutation to batch
 +     * @param indexMetaData TODO
 +     * @param updateMap
 +     *            index updates into which to add new updates. Modified as a 
side-effect.
 +     * 
 +     * @throws IOException
 +     */
 +    private void batchMutationAndAddUpdates(IndexUpdateManager manager, 
LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws 
IOException {
 +        // split the mutation into timestamp-based batches
 +        Collection<Batch> batches = createTimestampBatchesFromMutation(m);
 +
 +        // go through each batch of keyvalues and build separate index 
entries for each
 +        boolean cleanupCurrentState = true;
 +        for (Batch batch : batches) {
 +            /*
 +             * We have to split the work between the cleanup and the update 
for each group because when we update the
 +             * current state of the row for the current batch (appending the 
mutations for the current batch) the next
 +             * group will see that as the current state, which will can cause 
the a delete and a put to be created for
 +             * the next group.
 +             */
 +            if (addMutationsForBatch(manager, batch, state, 
cleanupCurrentState, indexMetaData)) {
 +                cleanupCurrentState = false;
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. 
Updates any {@link KeyValue} with a timestamp
 +     * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time 
the method is called.
 +     * 
 +     * @param m
 +     *            {@link Mutation} from which to extract the {@link KeyValue}s
 +     * @return the mutation, broken into batches and sorted in ascending 
order (smallest first)
 +     */
 +    protected Collection<Batch> createTimestampBatchesFromMutation(Mutation 
m) {
 +        Map<Long, Batch> batches = new HashMap<Long, Batch>();
 +        for (List<Cell> family : m.getFamilyCellMap().values()) {
 +            List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
 +            createTimestampBatchesFromKeyValues(familyKVs, batches);
 +        }
 +        // sort the batches
 +        List<Batch> sorted = new ArrayList<Batch>(batches.values());
 +        Collections.sort(sorted, new Comparator<Batch>() {
 +            @Override
 +            public int compare(Batch o1, Batch o2) {
 +                return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
 +            }
 +        });
 +        return sorted;
 +    }
 +
 +    /**
 +     * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. 
Updates any {@link KeyValue} with a
 +     * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at 
the time the method is called.
 +     * 
 +     * @param kvs
 +     *            {@link KeyValue}s to break into batches
 +     * @param batches
 +     *            to update with the given kvs
 +     */
 +    protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> 
kvs, Map<Long, Batch> batches) {
-         long now = EnvironmentEdgeManager.currentTimeMillis();
++        long now = EnvironmentEdgeManager.currentTime();
 +        byte[] nowBytes = Bytes.toBytes(now);
 +
 +        // batch kvs by timestamp
 +        for (KeyValue kv : kvs) {
 +            long ts = kv.getTimestamp();
 +            // override the timestamp to the current time, so the index and 
primary tables match
 +            // all the keys with LATEST_TIMESTAMP will then be put into the 
same batch
 +            if (kv.updateLatestStamp(nowBytes)) {
 +                ts = now;
 +            }
 +            Batch batch = batches.get(ts);
 +            if (batch == null) {
 +                batch = new Batch(ts);
 +                batches.put(ts, batch);
 +            }
 +            batch.add(kv);
 +        }
 +    }
 +
 +    /**
 +     * For a single batch, get all the index updates and add them to the 
updateMap
 +     * <p>
 +     * This method manages cleaning up the entire history of the row from the 
given timestamp forward for out-of-order
 +     * (e.g. 'back in time') updates.
 +     * <p>
 +     * If things arrive out of order (client is using custom timestamps) we 
should still see the index in the correct
 +     * order (assuming we scan after the out-of-order update in finished). 
Therefore, we when we aren't the most recent
 +     * update to the index, we need to delete the state at the current 
timestamp (similar to above), but also issue a
 +     * delete for the added index updates at the next newest timestamp of any 
of the columns in the update; we need to
 +     * cleanup the insert so it looks like it was also deleted at that next 
newest timestamp. However, its not enough to
 +     * just update the one in front of us - that column will likely be 
applied to index entries up the entire history in
 +     * front of us, which also needs to be fixed up.
 +     * <p>
 +     * However, the current update usually will be the most recent thing to 
be added. In that case, all we need to is
 +     * issue a delete for the previous index row (the state of the row, 
without the update applied) at the current
 +     * timestamp. This gets rid of anything currently in the index for the 
current state of the row (at the timestamp).
 +     * Then we can just follow that by applying the pending update and 
building the index update based on the new row
 +     * state.
 +     * 
 +     * @param updateMap
 +     *            map to update with new index elements
 +     * @param batch
 +     *            timestamp-based batch of edits
 +     * @param state
 +     *            local state to update and pass to the codec
 +     * @param requireCurrentStateCleanup
 +     *            <tt>true</tt> if we should should attempt to cleanup the 
current state of the table, in the event of a
 +     *            'back in time' batch. <tt>false</tt> indicates we should 
not attempt the cleanup, e.g. an earlier
 +     *            batch already did the cleanup.
 +     * @param indexMetaData TODO
 +     * @return <tt>true</tt> if we cleaned up the current state forward (had 
a back-in-time put), <tt>false</tt>
 +     *         otherwise
 +     * @throws IOException
 +     */
 +    private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch 
batch, LocalTableState state,
 +            boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) 
throws IOException {
 +
 +        // need a temporary manager for the current batch. It should resolve 
any conflicts for the
 +        // current batch. Essentially, we can get the case where a batch 
doesn't change the current
 +        // state of the index (all Puts are covered by deletes), in which 
case we don't want to add
 +        // anything
 +        // A. Get the correct values for the pending state in the batch
 +        // A.1 start by cleaning up the current state - as long as there are 
key-values in the batch
 +        // that are indexed, we need to change the current state of the 
index. Its up to the codec to
 +        // determine if we need to make any cleanup given the pending update.
 +        long batchTs = batch.getTimestamp();
 +        state.setPendingUpdates(batch.getKvs());
 +        addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
 +
 +        // A.2 do a single pass first for the updates to the current state
 +        state.applyPendingUpdates();
 +        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, 
indexMetaData);
 +        // if all the updates are the latest thing in the index, we are done 
- don't go and fix history
 +        if (ColumnTracker.isNewestTime(minTs)) { return false; }
 +
 +        // A.3 otherwise, we need to roll up through the current state and 
get the 'correct' view of the
 +        // index. after this, we have the correct view of the index, from the 
batch up to the index
 +        while (!ColumnTracker.isNewestTime(minTs)) {
 +            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, 
indexMetaData);
 +        }
 +
 +        // B. only cleanup the current state if we need to - its a huge waste 
of effort otherwise.
 +        if (requireCurrentStateCleanup) {
 +            // roll back the pending update. This is needed so we can remove 
all the 'old' index entries.
 +            // We don't need to do the puts here, but just the deletes at the 
given timestamps since we
 +            // just want to completely hide the incorrect entries.
 +            state.rollback(batch.getKvs());
 +            // setup state
 +            state.setPendingUpdates(batch.getKvs());
 +
 +            // cleanup the pending batch. If anything in the correct history 
is covered by Deletes used to
 +            // 'fix' history (same row key and ts), we just drop the delete 
(we don't want to drop both
 +            // because the update may have a different set of columns or 
value based on the update).
 +            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, 
indexMetaData);
 +
 +            // have to roll the state forward again, so the current state is 
correct
 +            state.applyPendingUpdates();
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    private long addUpdateForGivenTimestamp(long ts, LocalTableState state, 
IndexUpdateManager updateMap, IndexMetaData indexMetaData)
 +            throws IOException {
 +        state.setCurrentTimestamp(ts);
 +        ts = addCurrentStateMutationsForBatch(updateMap, state, 
indexMetaData);
 +        return ts;
 +    }
 +
 +    private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long 
batchTs, LocalTableState state, IndexMetaData indexMetaData)
 +            throws IOException {
 +        // get the cleanup for the current state
 +        state.setCurrentTimestamp(batchTs);
 +        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
 +        // ignore any index tracking from the delete
 +        state.resetTrackedColumns();
 +    }
 +
 +    /**
 +     * Add the necessary mutations for the pending batch on the local state. 
Handles rolling up through history to
 +     * determine the index changes after applying the batch (for the case 
where the batch is back in time).
 +     * 
 +     * @param updateMap
 +     *            to update with index mutations
 +     * @param state
 +     *            current state of the table
 +     * @param indexMetaData TODO
 +     * @param batch
 +     *            to apply to the current state
 +     * @return the minimum timestamp across all index columns requested. If 
{@link ColumnTracker#isNewestTime(long)}
 +     *         returns <tt>true</tt> on the returned timestamp, we know that 
this <i>was not a back-in-time update</i>.
 +     * @throws IOException
 +     */
 +    private long addCurrentStateMutationsForBatch(IndexUpdateManager 
updateMap, LocalTableState state, IndexMetaData indexMetaData)
 +            throws IOException {
 +
 +        // get the index updates for this current batch
 +        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, 
indexMetaData);
 +        state.resetTrackedColumns();
 +
 +        /*
 +         * go through all the pending updates. If we are sure that all the 
entries are the latest timestamp, we can just
 +         * add the index updates and move on. However, if there are columns 
that we skip past (based on the timestamp of
 +         * the batch), we need to roll back up the history. Regardless of 
whether or not they are the latest timestamp,
 +         * the entries here are going to be correct for the current batch 
timestamp, so we add them to the updates. The
 +         * only thing we really care about it if we need to roll up the 
history and fix it as we go.
 +         */
 +        // timestamp of the next update we need to track
 +        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
 +        List<IndexedColumnGroup> columnHints = new 
ArrayList<IndexedColumnGroup>();
 +        for (IndexUpdate update : upserts) {
 +            // this is the one bit where we check the timestamps
 +            final ColumnTracker tracker = update.getIndexedColumns();
 +            long trackerTs = tracker.getTS();
 +            // update the next min TS we need to track
 +            if (trackerTs < minTs) {
 +                minTs = tracker.getTS();
 +            }
 +            // track index hints for the next round. Hint if we need an 
update for that column for the
 +            // next timestamp. These columns clearly won't need to update as 
we go through time as they
 +            // already match the most recent possible thing.
 +            boolean needsCleanup = false;
 +            if (tracker.hasNewerTimestamps()) {
 +                columnHints.add(tracker);
 +                // this update also needs to be cleaned up at the next 
timestamp because it not the latest.
 +                needsCleanup = true;
 +            }
 +
 +            // only make the put if the index update has been setup
 +            if (update.isValid()) {
 +                byte[] table = update.getTableName();
 +                Mutation mutation = update.getUpdate();
 +                updateMap.addIndexUpdate(table, mutation);
 +
 +                // only make the cleanup if we made a put and need cleanup
 +                if (needsCleanup) {
 +                    // there is a TS for the interested columns that is 
greater than the columns in the
 +                    // put. Therefore, we need to issue a delete at the same 
timestamp
 +                    Delete d = new Delete(mutation.getRow());
 +                    d.setTimestamp(tracker.getTS());
 +                    updateMap.addIndexUpdate(table, d);
 +                }
 +            }
 +        }
 +        return minTs;
 +    }
 +
 +    /**
 +     * Cleanup the index based on the current state from the given batch. 
Iterates over each timestamp (for the indexed
 +     * rows) for the current state of the table and cleans up all the 
existing entries generated by the codec.
 +     * <p>
 +     * Adds all pending updates to the updateMap
 +     * 
 +     * @param updateMap
 +     *            updated with the pending index updates from the codec
 +     * @param batchTs
 +     *            timestamp from which we should cleanup
 +     * @param state
 +     *            current state of the primary table. Should already by setup 
to the correct state from which we want to
 +     *            cleanup.
 +     * @param indexMetaData TODO
 +     * @throws IOException
 +     */
 +    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager 
updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
 +            throws IOException {
 +        // get the cleanup for the current state
 +        state.setCurrentTimestamp(batchTs);
 +        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
 +        Set<ColumnTracker> trackers = state.getTrackedColumns();
 +        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
 +        for (ColumnTracker tracker : trackers) {
 +            if (tracker.getTS() < minTs) {
 +                minTs = tracker.getTS();
 +            }
 +        }
 +        state.resetTrackedColumns();
 +        if (!ColumnTracker.isNewestTime(minTs)) {
 +            state.setHints(Lists.newArrayList(trackers));
 +            cleanupIndexStateFromBatchOnward(updateMap, minTs, state, 
indexMetaData);
 +        }
 +    }
 +
 +    /**
 +     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
 +     * update map.
 +     * <p>
 +     * Expects the {@link LocalTableState} to already be correctly setup 
(correct timestamp, updates applied, etc).
 +     * @param indexMetaData TODO
 +     * 
 +     * @throws IOException
 +     */
 +    protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, 
LocalTableState state, long ts, IndexMetaData indexMetaData)
 +            throws IOException {
 +        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, 
indexMetaData);
 +        if (cleanup != null) {
 +            for (IndexUpdate d : cleanup) {
 +                if (!d.isValid()) {
 +                    continue;
 +                }
 +                // override the timestamps in the delete to match the current 
batch.
 +                Delete remove = (Delete)d.getUpdate();
 +                remove.setTimestamp(ts);
 +                updateMap.addIndexUpdate(d.getTableName(), remove);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public Collection<Pair<Mutation, byte[]>> 
getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData 
indexMetaData)
 +            throws IOException {
 +        // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
 +        return null;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index d8b215c,4c4d0b0..0e961db
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@@ -23,7 -23,7 +23,8 @@@ import java.util.Collection
  import java.util.List;
  import java.util.Map;
  
 +import org.apache.hadoop.hbase.Cell;
+ import org.apache.hadoop.hbase.KeyValue;
  import org.apache.hadoop.hbase.client.Mutation;
  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
  import org.apache.hadoop.hbase.util.Pair;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 59bc8de,4b953e6..4efca9f
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@@ -38,327 -44,325 +38,327 @@@ import com.google.common.collect.Lists
   */
  public class CoveredColumnIndexCodec extends BaseIndexCodec {
  
 -  private static final byte[] EMPTY_BYTES = new byte[0];
 -  public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
 +    private static final byte[] EMPTY_BYTES = new byte[0];
 +    public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
  
 -  private List<ColumnGroup> groups;
 +    private List<ColumnGroup> groups;
  
 -  /**
 -   * @param groups to initialize the codec with
 -   * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing
 -   *         purposes
 -   */
 -  public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
 -    CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
 -    codec.groups = Lists.newArrayList(groups);
 -    return codec;
 -  }
 -
 -  @Override
 -  public void initialize(RegionCoprocessorEnvironment env) {
 -    groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
 -  }
 -
 -  @Override
 -  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
 -    List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
 -    for (ColumnGroup group : groups) {
 -      IndexUpdate update = getIndexUpdateForGroup(group, state);
 -      updates.add(update);
 +    /**
 +     * @param groups
 +     *            to initialize the codec with
 +     * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing purposes
 +     */
 +    public static CoveredColumnIndexCodec 
getCodecForTesting(List<ColumnGroup> groups) {
 +        CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
 +        codec.groups = Lists.newArrayList(groups);
 +        return codec;
      }
 -    return updates;
 -  }
  
 -  /**
 -   * @param group
 -   * @param state
 -   * @return the update that should be made to the table
 -   */
 -  private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state) {
 -    List<CoveredColumn> refs = group.getColumns();
 -    try {
 -      Pair<Scanner, IndexUpdate> stateInfo = 
state.getIndexedColumnsTableState(refs);
 -      Scanner kvs = stateInfo.getFirst();
 -      Pair<Integer, List<ColumnEntry>> columns =
 -          getNextEntries(refs, kvs, state.getCurrentRowKey());
 -      // make sure we close the scanner
 -      kvs.close();
 -      if (columns.getFirst().intValue() == 0) {
 -        return stateInfo.getSecond();
 -      }
 -      // have all the column entries, so just turn it into a Delete for the 
row
 -      // convert the entries to the needed values
 -      byte[] rowKey =
 -          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), 
columns.getSecond());
 -      Put p = new Put(rowKey, state.getCurrentTimestamp());
 -      // add the columns to the put
 -      addColumnsToPut(p, columns.getSecond());
 -
 -      // update the index info
 -      IndexUpdate update = stateInfo.getSecond();
 -      update.setTable(Bytes.toBytes(group.getTable()));
 -      update.setUpdate(p);
 -      return update;
 -    } catch (IOException e) {
 -      throw new RuntimeException("Unexpected exception when getting state for 
columns: " + refs);
 +    @Override
 +    public void initialize(RegionCoprocessorEnvironment env) {
 +        groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
      }
 -  }
  
 -  private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
 -    // add each of the corresponding families to the put
 -    int count = 0;
 -    for (ColumnEntry column : columns) {
 -      indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
 -        ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
 -    }
 +    @Override
 +    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) {
 +        List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
 +        for (ColumnGroup group : groups) {
 +            IndexUpdate update = getIndexUpdateForGroup(group, state);
 +            updates.add(update);
 +        }
 +        return updates;
-     }
+   }
  
 -  private static byte[] toIndexQualifier(CoveredColumn column) {
 -    return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR),
 -      column.getQualifier());
 -  }
 +    /**
 +     * @param group
 +     * @param state
 +     * @return the update that should be made to the table
 +     */
 +    private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state) {
 +        List<CoveredColumn> refs = group.getColumns();
 +        try {
 +            Pair<Scanner, IndexUpdate> stateInfo = 
((LocalTableState)state).getIndexedColumnsTableState(refs);
 +            Scanner kvs = stateInfo.getFirst();
 +            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs, state.getCurrentRowKey());
 +            // make sure we close the scanner
 +            kvs.close();
 +            if (columns.getFirst().intValue() == 0) { return 
stateInfo.getSecond(); }
 +            // have all the column entries, so just turn it into a Delete for 
the row
 +            // convert the entries to the needed values
 +            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
 +            Put p = new Put(rowKey, state.getCurrentTimestamp());
 +            // add the columns to the put
 +            addColumnsToPut(p, columns.getSecond());
 +
 +            // update the index info
 +            IndexUpdate update = stateInfo.getSecond();
 +            update.setTable(Bytes.toBytes(group.getTable()));
 +            update.setUpdate(p);
 +            return update;
 +        } catch (IOException e) {
 +            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
 +        }
 +    }
  
 -  @Override
 -  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
 -    List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
 -    for (ColumnGroup group : groups) {
 -      deletes.add(getDeleteForGroup(group, state));
 +    private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
 +        // add each of the corresponding families to the put
 +        int count = 0;
 +        for (ColumnEntry column : columns) {
 +            indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
 +                    ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
 +        }
      }
 -    return deletes;
 -  }
  
 +    private static byte[] toIndexQualifier(CoveredColumn column) {
 +        return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR), column.getQualifier());
 +    }
  
 -  /**
 -   * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table
 -   * for a given index.
 -   * @param group index information
 -   * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
 -   */
 -  private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
 -    List<CoveredColumn> refs = group.getColumns();
 -    try {
 -      Pair<Scanner, IndexUpdate> kvs = 
state.getIndexedColumnsTableState(refs);
 -      Pair<Integer, List<ColumnEntry>> columns =
 -          getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
 -      // make sure we close the scanner reference
 -      kvs.getFirst().close();
 -      // no change, just return the passed update
 -      if (columns.getFirst() == 0) {
 -        return kvs.getSecond();
 -      }
 -      // have all the column entries, so just turn it into a Delete for the 
row
 -      // convert the entries to the needed values
 -      byte[] rowKey =
 -          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), 
columns.getSecond());
 -      Delete d = new Delete(rowKey);
 -      d.setTimestamp(state.getCurrentTimestamp());
 -      IndexUpdate update = kvs.getSecond();
 -      update.setUpdate(d);
 -      update.setTable(Bytes.toBytes(group.getTable()));
 -      return update;
 -    } catch (IOException e) {
 -      throw new RuntimeException("Unexpected exception when getting state for 
columns: " + refs);
 +    @Override
 +    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) {
 +        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
 +        for (ColumnGroup group : groups) {
 +            deletes.add(getDeleteForGroup(group, state));
 +        }
 +        return deletes;
      }
 -  }
  
 -  /**
 -   * Get the next batch of primary table values for the given columns
 -   * @param refs columns to match against
 -   * @param kvs
 -   * @param currentRow
 -   * @return the total length of all values found and the entries to add for 
the index
 -   */
 -  private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> 
refs, Scanner kvs,
 -      byte[] currentRow) throws IOException {
 -    int totalValueLength = 0;
 -    List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
 -
 -    // pull out the latest state for each column reference, in order
 -    for (CoveredColumn ref : refs) {
 -      KeyValue first = ref.getFirstKeyValueForRow(currentRow);
 -      if (!kvs.seek(first)) {
 -        // no more keys, so add a null value
 -        entries.add(new ColumnEntry(null, ref));
 -        continue;
 -      }
 -      // there is a next value - we only care about the current value, so we 
can just snag that
 -      Cell next = kvs.next();
 -      if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
 -        byte[] v = next.getValue();
 -        totalValueLength += v.length;
 -        entries.add(new ColumnEntry(v, ref));
 -      } else {
 -        // this first one didn't match at all, so we have to put in a null 
entry
 -        entries.add(new ColumnEntry(null, ref));
 -        continue;
 -      }
 -      // here's where is gets a little tricky - we either need to decide if 
we should continue
 -      // adding entries (matches all qualifiers) or if we are done (matches a 
single qualifier)
 -      if (!ref.allColumns()) {
 -        continue;
 -      }
 -      // matches all columns, so we need to iterate until we hit the next 
column with the same
 -      // family as the current key
 -      byte[] lastQual = next.getQualifier();
 -      byte[] nextQual = null;
 -      while ((next = kvs.next()) != null) {
 -        // different family, done with this column
 -        if (!ref.matchesFamily(next.getFamily())) {
 -          break;
 +    /**
 +     * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table for a given index.
 +     * 
 +     * @param group
 +     *            index information
 +     * @return the cleanup for the given index, or <tt>null</tt> if no 
cleanup is necessary
 +     */
 +    private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState 
state) {
 +        List<CoveredColumn> refs = group.getColumns();
 +        try {
 +            Pair<Scanner, IndexUpdate> kvs = 
((LocalTableState)state).getIndexedColumnsTableState(refs);
 +            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs.getFirst(), state.getCurrentRowKey());
 +            // make sure we close the scanner reference
 +            kvs.getFirst().close();
 +            // no change, just return the passed update
 +            if (columns.getFirst() == 0) { return kvs.getSecond(); }
 +            // have all the column entries, so just turn it into a Delete for 
the row
 +            // convert the entries to the needed values
 +            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
 +            Delete d = new Delete(rowKey);
 +            d.setTimestamp(state.getCurrentTimestamp());
 +            IndexUpdate update = kvs.getSecond();
 +            update.setUpdate(d);
 +            update.setTable(Bytes.toBytes(group.getTable()));
 +            return update;
 +        } catch (IOException e) {
 +            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
          }
 -        nextQual = next.getQualifier();
 -        // we are still on the same qualifier - skip it, since we already 
added a column for it
 -        if (Arrays.equals(lastQual, nextQual)) {
 -          continue;
 +    }
 +
 +    /**
 +     * Get the next batch of primary table values for the given columns
 +     * 
 +     * @param refs
 +     *            columns to match against
 +     * @param state
 +     * @return the total length of all values found and the entries to add 
for the index
 +     */
 +    private Pair<Integer, List<ColumnEntry>> 
getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
 +            throws IOException {
 +        int totalValueLength = 0;
 +        List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
 +
 +        // pull out the latest state for each column reference, in order
 +        for (CoveredColumn ref : refs) {
 +            KeyValue first = ref.getFirstKeyValueForRow(currentRow);
 +            if (!kvs.seek(first)) {
 +                // no more keys, so add a null value
 +                entries.add(new ColumnEntry(null, ref));
 +                continue;
 +            }
 +            // there is a next value - we only care about the current value, 
so we can just snag that
 +            Cell next = kvs.next();
 +            if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
 +                byte[] v = next.getValue();
 +                totalValueLength += v.length;
 +                entries.add(new ColumnEntry(v, ref));
 +            } else {
 +                // this first one didn't match at all, so we have to put in a 
null entry
 +                entries.add(new ColumnEntry(null, ref));
 +                continue;
 +            }
 +            // here's where is gets a little tricky - we either need to 
decide if we should continue
 +            // adding entries (matches all qualifiers) or if we are done 
(matches a single qualifier)
 +            if (!ref.allColumns()) {
 +                continue;
 +            }
 +            // matches all columns, so we need to iterate until we hit the 
next column with the same
 +            // family as the current key
 +            byte[] lastQual = next.getQualifier();
 +            byte[] nextQual = null;
 +            while ((next = kvs.next()) != null) {
 +                // different family, done with this column
 +                if (!ref.matchesFamily(next.getFamily())) {
 +                    break;
 +                }
 +                nextQual = next.getQualifier();
 +                // we are still on the same qualifier - skip it, since we 
already added a column for it
 +                if (Arrays.equals(lastQual, nextQual)) {
 +                    continue;
 +                }
 +                // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
 +                byte[] v = next.getValue();
 +                totalValueLength += v.length;
 +                entries.add(new ColumnEntry(v, ref));
 +                // update the last qualifier to check against
 +                lastQual = nextQual;
 +            }
          }
 -        // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
 -        byte[] v = next.getValue();
 -        totalValueLength += v.length;
 -        entries.add(new ColumnEntry(v, ref));
 -        // update the last qualifier to check against
 -        lastQual = nextQual;
 -      }
 +        return new Pair<Integer, List<ColumnEntry>>(totalValueLength, 
entries);
      }
 -    return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
 -  }
  
 -  static class ColumnEntry {
 -    byte[] value = EMPTY_BYTES;
 -    CoveredColumn ref;
 +    static class ColumnEntry {
 +        byte[] value = EMPTY_BYTES;
 +        CoveredColumn ref;
  
 -    public ColumnEntry(byte[] value, CoveredColumn ref) {
 -      this.value = value == null ? EMPTY_BYTES : value;
 -      this.ref = ref;
 +        public ColumnEntry(byte[] value, CoveredColumn ref) {
 +            this.value = value == null ? EMPTY_BYTES : value;
 +            this.ref = ref;
 +        }
      }
 -  }
  
 -  /**
 -   * Compose the final index row key.
 -   * <p>
 -   * This is faster than adding each value independently as we can just build 
a single a array and
 -   * copy everything over once.
 -   * @param pk primary key of the original row
 -   * @param length total number of bytes of all the values that should be 
added
 -   * @param values to use when building the key
 -   */
 -  static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> 
values) {
 -    // now build up expected row key, each of the values, in order, followed 
by the PK and then some
 -    // info about lengths so we can deserialize each value
 -    byte[] output = new byte[length + pk.length];
 -    int pos = 0;
 -    int[] lengths = new int[values.size()];
 -    int i = 0;
 -    for (ColumnEntry entry : values) {
 -      byte[] v = entry.value;
 -      // skip doing the copy attempt, if we don't need to
 -      if (v.length != 0) {
 -        System.arraycopy(v, 0, output, pos, v.length);
 -        pos += v.length;
 -      }
 -      lengths[i++] = v.length;
 -    }
 +    /**
 +     * Compose the final index row key.
 +     * <p>
 +     * This is faster than adding each value independently as we can just 
build a single a array and copy everything
 +     * over once.
 +     * 
 +     * @param pk
 +     *            primary key of the original row
 +     * @param length
 +     *            total number of bytes of all the values that should be added
 +     * @param values
 +     *            to use when building the key
 +     */
 +    static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> 
values) {
 +        // now build up expected row key, each of the values, in order, 
followed by the PK and then some
 +        // info about lengths so we can deserialize each value
 +        byte[] output = new byte[length + pk.length];
 +        int pos = 0;
 +        int[] lengths = new int[values.size()];
 +        int i = 0;
 +        for (ColumnEntry entry : values) {
 +            byte[] v = entry.value;
 +            // skip doing the copy attempt, if we don't need to
 +            if (v.length != 0) {
 +                System.arraycopy(v, 0, output, pos, v.length);
 +                pos += v.length;
 +            }
 +            lengths[i++] = v.length;
 +        }
 +
 +        // add the primary key to the end of the row key
 +        System.arraycopy(pk, 0, output, pos, pk.length);
  
 -    // add the primary key to the end of the row key
 -    System.arraycopy(pk, 0, output, pos, pk.length);
 +        // add the lengths as suffixes so we can deserialize the elements 
again
 +        for (int l : lengths) {
 +            output = ArrayUtils.addAll(output, Bytes.toBytes(l));
 +        }
  
 -    // add the lengths as suffixes so we can deserialize the elements again
 -    for (int l : lengths) {
 -      output = ArrayUtils.addAll(output, Bytes.toBytes(l));
 +        // and the last integer is the number of values
 +        return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
      }
  
 -    // and the last integer is the number of values
 -    return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
 -  }
 +    /**
 +     * Essentially a short-cut from building a {@link Put}.
 +     * 
 +     * @param pk
 +     *            row key
 +     * @param timestamp
 +     *            timestamp of all the keyvalues
 +     * @param values
 +     *            expected value--column pair
 +     * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value -- column pairs.
 +     */
 +    public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
 +            List<Pair<byte[], CoveredColumn>> values) {
 +
 +        int length = 0;
 +        List<ColumnEntry> expected = new 
ArrayList<ColumnEntry>(values.size());
 +        for (Pair<byte[], CoveredColumn> value : values) {
 +            ColumnEntry entry = new ColumnEntry(value.getFirst(), 
value.getSecond());
 +            length += value.getFirst().length;
 +            expected.add(entry);
 +        }
  
 -  /**
 -   * Essentially a short-cut from building a {@link Put}.
 -   * @param pk row key
 -   * @param timestamp timestamp of all the keyvalues
 -   * @param values expected value--column pair
 -   * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value
 -   *         -- column pairs.
 -   */
 -  public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
 -      List<Pair<byte[], CoveredColumn>> values) {
 -  
 -    int length = 0;
 -    List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
 -    for (Pair<byte[], CoveredColumn> value : values) {
 -      ColumnEntry entry = new ColumnEntry(value.getFirst(), 
value.getSecond());
 -      length += value.getFirst().length;
 -      expected.add(entry);
 -    }
 -  
 -    byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
 -    Put p = new Put(rowKey, timestamp);
 -    CoveredColumnIndexCodec.addColumnsToPut(p, expected);
 -    List<KeyValue> kvs = new ArrayList<KeyValue>();
 -    for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
 -      kvs.addAll(entry.getValue());
 -    }
 -  
 -    return kvs;
 -  }
 +        byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
 +        Put p = new Put(rowKey, timestamp);
 +        CoveredColumnIndexCodec.addColumnsToPut(p, expected);
 +        List<KeyValue> kvs = new ArrayList<KeyValue>();
 +        for (Entry<byte[], List<KeyValue>> entry : 
p.getFamilyMap().entrySet()) {
 +            kvs.addAll(entry.getValue());
 +        }
  
 -  public static List<byte[]> getValues(byte[] bytes) {
 -    // get the total number of keys in the bytes
 -    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
 -    List<byte[]> keys = new ArrayList<byte[]>(keyCount);
 -    int[] lengths = new int[keyCount];
 -    int lengthPos = keyCount - 1;
 -    int pos = bytes.length - Bytes.SIZEOF_INT;
 -    // figure out the length of each key
 -    for (int i = 0; i < keyCount; i++) {
 -      lengths[lengthPos--] = 
CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
 -      pos -= Bytes.SIZEOF_INT;
 +        return kvs;
      }
  
 -    int current = 0;
 -    for (int length : lengths) {
 -      byte[] key = Arrays.copyOfRange(bytes, current, current + length);
 -      keys.add(key);
 -      current += length;
 -    }
 +    public static List<byte[]> getValues(byte[] bytes) {
 +        // get the total number of keys in the bytes
 +        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
 +        List<byte[]> keys = new ArrayList<byte[]>(keyCount);
 +        int[] lengths = new int[keyCount];
 +        int lengthPos = keyCount - 1;
 +        int pos = bytes.length - Bytes.SIZEOF_INT;
 +        // figure out the length of each key
 +        for (int i = 0; i < keyCount; i++) {
 +            lengths[lengthPos--] = 
CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
 +            pos -= Bytes.SIZEOF_INT;
 +        }
  
 -    return keys;
 -  }
 +        int current = 0;
 +        for (int length : lengths) {
 +            byte[] key = Arrays.copyOfRange(bytes, current, current + length);
 +            keys.add(key);
 +            current += length;
 +        }
  
 -  /**
 -   * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
 -   * @param bytes array to read from
 -   * @param start start point, backwards from which to read. For example, if 
specifying "25", we
 -   *          would try to read an integer from 21 -> 25
 -   * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
 -   */
 -  private static int getPreviousInteger(byte[] bytes, int start) {
 -    return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
 -  }
 +        return keys;
 +    }
  
 -  /**
 -   * Check to see if an row key just contains a list of null values.
 -   * @param bytes row key to examine
 -   * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
 -   */
 -  public static boolean checkRowKeyForAllNulls(byte[] bytes) {
 -    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
 -    int pos = bytes.length - Bytes.SIZEOF_INT;
 -    for (int i = 0; i < keyCount; i++) {
 -      int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
 -      if (next > 0) {
 -        return false;
 -      }
 -      pos -= Bytes.SIZEOF_INT;
 +    /**
 +     * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
 +     * 
 +     * @param bytes
 +     *            array to read from
 +     * @param start
 +     *            start point, backwards from which to read. For example, if 
specifying "25", we would try to read an
 +     *            integer from 21 -> 25
 +     * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} 
bytes, if it exists.
 +     */
 +    private static int getPreviousInteger(byte[] bytes, int start) {
 +        return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
      }
  
 -    return true;
 -  }
 +    /**
 +     * Check to see if an row key just contains a list of null values.
 +     * 
 +     * @param bytes
 +     *            row key to examine
 +     * @return <tt>true</tt> if all the values are zero-length, 
<tt>false</tt> otherwise
 +     */
 +    public static boolean checkRowKeyForAllNulls(byte[] bytes) {
 +        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
 +        int pos = bytes.length - Bytes.SIZEOF_INT;
 +        for (int i = 0; i < keyCount; i++) {
 +            int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
 +            if (next > 0) { return false; }
 +            pos -= Bytes.SIZEOF_INT;
 +        }
  
 -  @Override
 -  public boolean isEnabled(Mutation m) {
 -    // this could be a bit smarter, looking at the groups for the mutation, 
but we leave it at this
 -    // simple check for the moment.
 -    return groups.size() > 0;
 -  }
 +        return true;
 +    }
 +
 +    @Override
 +    public boolean isEnabled(Mutation m) {
 +        // this could be a bit smarter, looking at the groups for the 
mutation, but we leave it at this
 +        // simple check for the moment.
 +        return groups.size() > 0;
 +    }
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 362ef18,ff33ec2..e120268
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@@ -137,10 -136,10 +137,10 @@@ public class ScannerBuilder 
        }
  
        @Override
 -      public boolean seek(KeyValue next) throws IOException {
 +      public boolean seek(Cell next) throws IOException {
          // check to see if the next kv is after the current key, in which 
case we can use reseek,
          // which will be more efficient
-         KeyValue peek = kvScanner.peek();
+         Cell peek = kvScanner.peek();
          // there is another value and its before the requested one - we can 
do a reseek!
          if (peek != null) {
            int compare = KeyValue.COMPARATOR.compare(peek, next);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index bd0461d,b060345..1f64003
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@@ -301,7 -270,7 +307,8 @@@ public class IndexMaintainer implement
  
      private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection 
connection) {
          this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
 +        assert(dataTable.getType() == PTableType.SYSTEM || 
dataTable.getType() == PTableType.TABLE || dataTable.getType() == 
PTableType.VIEW);
+         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
          this.isMultiTenant = dataTable.isMultiTenant();
          this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
          this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
@@@ -820,31 -827,17 +866,31 @@@
          return put;
      }
  
 -    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
 +    private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
-     private DeleteType getDeleteTypeOrNull(Collection<Cell> pendingUpdates) {
++    private DeleteType getDeleteTypeOrNull(Collection<KeyValue> 
pendingUpdates) {
          int nDeleteCF = 0;
 +        int nDeleteVersionCF = 0;
-         for (Cell kv : pendingUpdates) {
+         for (KeyValue kv : pendingUpdates) {
 -            if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
 -                nDeleteCF++;
 +              if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamilyVersion.getCode()) {
 +                nDeleteVersionCF++;
              }
 +              else if (kv.getTypeByte() == 
KeyValue.Type.DeleteFamily.getCode()
 +                              // Since we don't include the index rows in the 
change set for txn tables, we need to detect row deletes that have transformed 
by TransactionProcessor
 +                              // TODO see if implement 
PhoenixTransactionalIndexer.preDelete will work instead of the following check
 +                              || (CellUtil.matchingQualifier(kv, 
TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, 
HConstants.EMPTY_BYTE_ARRAY))) {
 +                  nDeleteCF++;
 +              }
          }
 -        return nDeleteCF == this.nDataCFs && nDeleteCF > 0;
 +        // This is what a delete looks like on the server side for mutable 
indexing...
 +        // Should all be one or the other for DeleteFamily versus 
DeleteFamilyVersion, but just in case not
 +        return nDeleteVersionCF >= this.nDataCFs ? DeleteType.SINGLE_VERSION 
: nDeleteCF + nDeleteVersionCF >= this.nDataCFs ? DeleteType.ALL_VERSIONS : 
null;
 +    }
 +    
-     public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
++    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
 +        return getDeleteTypeOrNull(pendingUpdates) != null;
      }
      
-     private boolean hasIndexedColumnChanged(ValueGetter oldState, 
Collection<Cell> pendingUpdates) throws IOException {
+     private boolean hasIndexedColumnChanged(ValueGetter oldState, 
Collection<KeyValue> pendingUpdates) throws IOException {
          if (pendingUpdates.isEmpty()) {
              return false;
          }
@@@ -884,25 -877,11 +930,25 @@@
      }
      
      @SuppressWarnings("deprecation")
-     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
+     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter 
oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> 
pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws 
IOException {
          byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, 
regionStartKey, regionEndKey);
          // Delete the entire row if any of the indexed columns changed
 -        if (oldState == null || isRowDeleted(pendingUpdates) || 
hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
 -            Delete delete = new Delete(indexRowKey, ts);
 +        DeleteType deleteType = null;
 +        if (oldState == null || 
(deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || 
hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
 +            byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
 +            Delete delete = new Delete(indexRowKey);
 +            // If table delete was single version, then index delete should 
be as well
 +            if (deleteType == DeleteType.SINGLE_VERSION) {
 +                for (ColumnReference ref : getCoverededColumns()) { // FIXME: 
Keep Set<byte[]> for index CFs?
 +                    delete.deleteFamilyVersion(ref.getFamily(), ts);
 +                }
 +                delete.deleteFamilyVersion(emptyCF, ts);
 +            } else {
 +                for (ColumnReference ref : getCoverededColumns()) { // FIXME: 
Keep Set<byte[]> for index CFs?
 +                    delete.deleteFamily(ref.getFamily(), ts);
 +                }
 +                delete.deleteFamily(emptyCF, ts);
 +            }
              delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
              return delete;
          }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 09a9f90,3ef01fe..0601e0a
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@@ -28,20 -26,15 +27,18 @@@ import org.apache.hadoop.conf.Configura
  import org.apache.hadoop.hbase.Cell;
  import org.apache.hadoop.hbase.client.Mutation;
  import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.regionserver.HRegion;
  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+ import org.apache.hadoop.hbase.regionserver.Region;
  import org.apache.hadoop.hbase.regionserver.RegionScanner;
  import org.apache.phoenix.compile.ScanRanges;
 -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
 +import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
  import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 +import org.apache.phoenix.hbase.index.write.IndexWriter;
  import org.apache.phoenix.query.KeyRange;
  import org.apache.phoenix.schema.types.PVarbinary;
- import org.apache.phoenix.util.ScanUtil;
- import org.apache.phoenix.util.SchemaUtil;
  
  import com.google.common.collect.Lists;
  
@@@ -93,11 -67,10 +90,11 @@@ public class PhoenixIndexBuilder extend
          }
          if (maintainers.isEmpty()) return;
          Scan scan = IndexManagementUtil.newLocalStateScan(new 
ArrayList<IndexMaintainer>(maintainers.values()));
 +        scan.setRaw(true);
-         ScanRanges scanRanges = 
ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, 
Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
          scanRanges.initializeScan(scan);
          scan.setFilter(scanRanges.getSkipScanFilter());
-         HRegion region = env.getRegion();
 -        Region region = this.env.getRegion();
++        Region region = env.getRegion();
          RegionScanner scanner = region.getScanner(scan);
          // Run through the scanner using internal nextRaw method
          region.startRegionOperation();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 2719119,222aefb..7acc90c
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@@ -14,9 -23,8 +14,9 @@@ import java.util.Collections
  import java.util.List;
  import java.util.Map;
  
 -import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hbase.client.Delete;
- import org.apache.hadoop.hbase.client.Mutation;
 +import org.apache.hadoop.hbase.client.Put;
+ import org.apache.hadoop.hbase.client.Mutation;
  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.hadoop.hbase.util.Pair;
@@@ -58,45 -113,71 +58,45 @@@ public class PhoenixIndexCodec extends 
      }
  
      @Override
 -    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws 
IOException {
 -        return getIndexUpdates(state, false);
 -    }
 -
 -    /**
 -     * 
 -     * @param state
 -     * @param upsert prepare index upserts if it's true otherwise prepare 
index deletes. 
 -     * @return
 -     * @throws IOException
 -     */
 -    private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean 
upsert) throws IOException {
 -        List<IndexMaintainer> indexMaintainers = 
getIndexMaintainers(state.getUpdateAttributes());
 -        if (indexMaintainers.isEmpty()) {
 +    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) throws IOException {
 +        List<IndexMaintainer> indexMaintainers = 
((PhoenixIndexMetaData)context).getIndexMaintainers();
 +        if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
              return Collections.emptyList();
          }
 +        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +        ptr.set(state.getCurrentRowKey());
          List<IndexUpdate> indexUpdates = Lists.newArrayList();
 +        for (IndexMaintainer maintainer : indexMaintainers) {
 +            Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns());
 +            ValueGetter valueGetter = statePair.getFirst();
 +            IndexUpdate indexUpdate = statePair.getSecond();
 +            indexUpdate.setTable(maintainer.getIndexTableName());
 +            Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, 
ptr, state.getCurrentTimestamp(), env
-                     .getRegion().getStartKey(), env.getRegion().getEndKey());
++                    .getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey());
 +            indexUpdate.setUpdate(put);
 +            indexUpdates.add(indexUpdate);
 +        }
 +        return indexUpdates;
 +    }
 +
 +    @Override
 +    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) throws IOException {
 +        List<IndexMaintainer> indexMaintainers = 
((PhoenixIndexMetaData)context).getIndexMaintainers();
          ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 -        // TODO: state.getCurrentRowKey() should take an 
ImmutableBytesWritable arg to prevent byte copy
 -        byte[] dataRowKey = state.getCurrentRowKey();
 -        ptr.set(dataRowKey);
 -        byte[] localIndexTableName = 
MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName());
 -        ValueGetter valueGetter = null;
 -        Scanner scanner = null;
 +        ptr.set(state.getCurrentRowKey());
 +        List<IndexUpdate> indexUpdates = Lists.newArrayList();
          for (IndexMaintainer maintainer : indexMaintainers) {
 -            if(upsert) {
 -                // Short-circuit building state when we know it's a row 
deletion
 -                if (maintainer.isRowDeleted(state.getPendingUpdate())) {
 -                    continue;
 -                }
 -            }
 -            IndexUpdate indexUpdate = null;
 -            if (maintainer.isImmutableRows()) {
 -                indexUpdate = new IndexUpdate(new 
ColumnTracker(maintainer.getAllColumns()));
 -                if(maintainer.isLocalIndex()) {
 -                    indexUpdate.setTable(localIndexTableName);
 -                } else {
 -                    indexUpdate.setTable(maintainer.getIndexTableName());
 -                }
 -                valueGetter = 
maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
 -            } else {
 -                // TODO: if more efficient, I could do this just once with 
all columns in all indexes
 -                Pair<Scanner,IndexUpdate> statePair = 
state.getIndexedColumnsTableState(maintainer.getAllColumns());
 -                scanner = statePair.getFirst();
 -                indexUpdate = statePair.getSecond();
 -                indexUpdate.setTable(maintainer.getIndexTableName());
 -                valueGetter = 
IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
 -            }
 -            Mutation mutation = null;
 -            if (upsert) {
 -                mutation =
 -                        maintainer.buildUpdateMutation(kvBuilder, 
valueGetter, ptr, state.getCurrentTimestamp(),
 -                            env.getRegion().getRegionInfo().getStartKey(),
 -                            env.getRegion().getRegionInfo().getEndKey());
 -            } else {
 -                mutation =
 -                        maintainer.buildDeleteMutation(kvBuilder, 
valueGetter, ptr, state.getPendingUpdate(),
 -                            state.getCurrentTimestamp(), 
env.getRegion().getRegionInfo().getStartKey(),
 -                            env.getRegion().getRegionInfo().getEndKey());
 -            }
 -            indexUpdate.setUpdate(mutation);
 -            if (scanner != null) {
 -                scanner.close();
 -                scanner = null;
 -            }
 +            // For transactional tables, we use an index maintainer
 +            // to aid in rollback if there's a KeyValue column in the index. 
The alternative would be
 +            // to hold on to all uncommitted index row keys (even ones 
already sent to HBase) on the
 +            // client side.
 +            Pair<ValueGetter, IndexUpdate> statePair = 
state.getIndexUpdateState(maintainer.getAllColumns());
 +            ValueGetter valueGetter = statePair.getFirst();
 +            IndexUpdate indexUpdate = statePair.getSecond();
 +            indexUpdate.setTable(maintainer.getIndexTableName());
 +            Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, 
valueGetter, ptr, state.getPendingUpdate(),
-                     state.getCurrentTimestamp(), 
env.getRegion().getStartKey(), env.getRegion().getEndKey());
++                    state.getCurrentTimestamp(), 
env.getRegion().getRegionInfo().getStartKey(), 
env.getRegion().getRegionInfo().getEndKey());
 +            indexUpdate.setUpdate(delete);
              indexUpdates.add(indexUpdate);
          }
          return indexUpdates;

Reply via email to