http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index e4bc193..c4ed7a0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered;
 
@@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -42,203 +34,244 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.inject.Key;
 
 /**
  * Manage the state of the HRegion's view of the table, for the single row.
  * <p>
- * Currently, this is a single-use object - you need to create a new one for 
each row that you need
- * to manage. In the future, we could make this object reusable, but for the 
moment its easier to
- * manage as a throw-away object.
+ * Currently, this is a single-use object - you need to create a new one for 
each row that you need to manage. In the
+ * future, we could make this object reusable, but for the moment its easier 
to manage as a throw-away object.
  * <p>
- * This class is <b>not</b> thread-safe - it requires external synchronization 
is access
- * concurrently.
+ * This class is <b>not</b> thread-safe - it requires external synchronization 
is access concurrently.
  */
 public class LocalTableState implements TableState {
 
-  private long ts;
-  private RegionCoprocessorEnvironment env;
-  private KeyValueStore memstore;
-  private LocalHBaseState table;
-  private Mutation update;
-  private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
-  private ScannerBuilder scannerBuilder;
-  private List<KeyValue> kvs = new ArrayList<KeyValue>();
-  private List<? extends IndexedColumnGroup> hints;
-  private CoveredColumns columnSet;
-
-  public LocalTableState(RegionCoprocessorEnvironment environment, 
LocalHBaseState table, Mutation update) {
-    this.env = environment;
-    this.table = table;
-    this.update = update;
-    this.memstore = new IndexMemStore();
-    this.scannerBuilder = new ScannerBuilder(memstore, update);
-    this.columnSet = new CoveredColumns();
-  }
-
-  public void addPendingUpdates(KeyValue... kvs) {
-    if (kvs == null) return;
-    addPendingUpdates(Arrays.asList(kvs));
-  }
-
-  public void addPendingUpdates(List<KeyValue> kvs) {
-    if(kvs == null) return;
-    setPendingUpdates(kvs);
-    addUpdate(kvs);
-  }
-
-  private void addUpdate(List<KeyValue> list) {
-    addUpdate(list, true);
-  }
-
-  private void addUpdate(List<KeyValue> list, boolean overwrite) {
-    if (list == null) return;
-    for (KeyValue kv : list) {
-      this.memstore.add(kv, overwrite);
-    }
-  }
-
-  @Override
-  public RegionCoprocessorEnvironment getEnvironment() {
-    return this.env;
-  }
-
-  @Override
-  public long getCurrentTimestamp() {
-    return this.ts;
-  }
-
-  @Override
-  public void setCurrentTimestamp(long timestamp) {
-    this.ts = timestamp;
-  }
-
-  public void resetTrackedColumns() {
-    this.trackedColumns.clear();
-  }
-
-  public Set<ColumnTracker> getTrackedColumns() {
-    return this.trackedColumns;
-  }
-
-  @Override
-  public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
-      Collection<? extends ColumnReference> indexedColumns) throws IOException 
{
-    ensureLocalStateInitialized(indexedColumns);
-    // filter out things with a newer timestamp and track the column 
references to which it applies
-    ColumnTracker tracker = new ColumnTracker(indexedColumns);
-    synchronized (this.trackedColumns) {
-      // we haven't seen this set of columns before, so we need to create a 
new tracker
-      if (!this.trackedColumns.contains(tracker)) {
-        this.trackedColumns.add(tracker);
-      }
-    }
-
-    Scanner scanner =
-        this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, 
ts);
-
-    return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
-  }
-
-  /**
-   * Initialize the managed local state. Generally, this will only be called by
-   * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be 
called concurrently from the outside.
-   * Even then, there is still fairly low contention as each new Put/Delete 
will have its own table
-   * state.
-   */
-  private synchronized void ensureLocalStateInitialized(
-      Collection<? extends ColumnReference> columns) throws IOException {
-    // check to see if we haven't initialized any columns yet
-    Collection<? extends ColumnReference> toCover = 
this.columnSet.findNonCoveredColumns(columns);
-    // we have all the columns loaded, so we are good to go.
-    if (toCover.isEmpty()) {
-      return;
-    }
-
-    // add the current state of the row
-    this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), 
false);
-
-    // add the covered columns to the set
-    for (ColumnReference ref : toCover) {
-      this.columnSet.addColumn(ref);
-    }
-  }
-
-  @Override
-  public Map<String, byte[]> getUpdateAttributes() {
-    return this.update.getAttributesMap();
-  }
-
-  @Override
-  public byte[] getCurrentRowKey() {
-    return this.update.getRow();
-  }
-
-  public Result getCurrentRowState() {
-    KeyValueScanner scanner = this.memstore.getScanner();
-    List<Cell> kvs = new ArrayList<Cell>();
-    while (scanner.peek() != null) {
-      try {
-        kvs.add(scanner.next());
-      } catch (IOException e) {
-        // this should never happen - something has gone terribly arwy if it 
has
-        throw new RuntimeException("Local MemStore threw IOException!");
-      }
-    }
-    return Result.create(kvs);
-  }
-
-  /**
-   * Helper to add a {@link Mutation} to the values stored for the current row
-   * @param pendingUpdate update to apply
-   */
-  public void addUpdateForTesting(Mutation pendingUpdate) {
-    for (Map.Entry<byte[], List<Cell>> e : 
pendingUpdate.getFamilyCellMap().entrySet()) {
-      List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
-      addUpdate(edits);
-    }
-  }
-
-  /**
-   * @param hints
-   */
-  public void setHints(List<? extends IndexedColumnGroup> hints) {
-    this.hints = hints;
-  }
-
-  @Override
-  public List<? extends IndexedColumnGroup> getIndexColumnHints() {
-    return this.hints;
-  }
-
-  @Override
-  public Collection<KeyValue> getPendingUpdate() {
-    return this.kvs;
-  }
-
-  /**
-   * Set the {@link KeyValue}s in the update for which we are currently 
building an index update,
-   * but don't actually apply them.
-   * @param update pending {@link KeyValue}s
-   */
-  public void setPendingUpdates(Collection<KeyValue> update) {
-    this.kvs.clear();
-    this.kvs.addAll(update);
-  }
-
-  /**
-   * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
-   */
-  public void applyPendingUpdates() {
-    this.addUpdate(kvs);
-  }
-
-  /**
-   * Rollback all the given values from the underlying state.
-   * @param values
-   */
-  public void rollback(Collection<KeyValue> values) {
-    for (KeyValue kv : values) {
-      this.memstore.rollback(kv);
-    }
-  }
+    private long ts;
+    private RegionCoprocessorEnvironment env;
+    private KeyValueStore memstore;
+    private LocalHBaseState table;
+    private Mutation update;
+    private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
+    private ScannerBuilder scannerBuilder;
+    private List<KeyValue> kvs = new ArrayList<KeyValue>();
+    private List<? extends IndexedColumnGroup> hints;
+    private CoveredColumns columnSet;
+
+    public LocalTableState(RegionCoprocessorEnvironment environment, 
LocalHBaseState table, Mutation update) {
+        this.env = environment;
+        this.table = table;
+        this.update = update;
+        this.memstore = new IndexMemStore();
+        this.scannerBuilder = new ScannerBuilder(memstore, update);
+        this.columnSet = new CoveredColumns();
+    }
+
+    public void addPendingUpdates(KeyValue... kvs) {
+        if (kvs == null) return;
+        addPendingUpdates(Arrays.asList(kvs));
+    }
+
+    public void addPendingUpdates(List<KeyValue> kvs) {
+        if (kvs == null) return;
+        setPendingUpdates(kvs);
+        addUpdate(kvs);
+    }
+
+    private void addUpdate(List<KeyValue> list) {
+        addUpdate(list, true);
+    }
+
+    private void addUpdate(List<KeyValue> list, boolean overwrite) {
+        if (list == null) return;
+        for (KeyValue kv : list) {
+            this.memstore.add(kv, overwrite);
+        }
+    }
+
+    @Override
+    public RegionCoprocessorEnvironment getEnvironment() {
+        return this.env;
+    }
+
+    @Override
+    public long getCurrentTimestamp() {
+        return this.ts;
+    }
+
+    /**
+     * Set the current timestamp up to which the table should allow access to 
the underlying table.
+     * This overrides the timestamp view provided by the indexer - use with 
care!
+     * @param timestamp timestamp up to which the table should allow access.
+     */
+    public void setCurrentTimestamp(long timestamp) {
+        this.ts = timestamp;
+    }
+
+    public void resetTrackedColumns() {
+        this.trackedColumns.clear();
+    }
+
+    public Set<ColumnTracker> getTrackedColumns() {
+        return this.trackedColumns;
+    }
+
+    /**
+     * Get a scanner on the columns that are needed by the index.
+     * <p>
+     * The returned scanner is already pre-seeked to the first {@link 
KeyValue} that matches the given
+     * columns with a timestamp earlier than the timestamp to which the table 
is currently set (the
+     * current state of the table for which we need to build an update).
+     * <p>
+     * If none of the passed columns matches any of the columns in the pending 
update (as determined
+     * by {@link ColumnReference#matchesFamily(byte[])} and
+     * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner 
will be returned. This
+     * is because it doesn't make sense to build index updates when there is 
no change in the table
+     * state for any of the columns you are indexing.
+     * <p>
+     * <i>NOTE:</i> This method should <b>not</b> be used during
+     * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the 
pending update will not yet have been
+     * applied - you are merely attempting to cleanup the current state and 
therefore do <i>not</i>
+     * need to track the indexed columns.
+     * <p>
+     * As a side-effect, we update a timestamp for the next-most-recent 
timestamp for the columns you
+     * request - you will never see a column with the timestamp we are 
tracking, but the next oldest
+     * timestamp for that column.
+     * @param indexedColumns the columns to that will be indexed
+     * @return an iterator over the columns and the {@link IndexUpdate} that 
should be passed back to
+     *         the builder. Even if no update is necessary for the requested 
columns, you still need
+     *         to return the {@link IndexUpdate}, just don't set the update 
for the
+     *         {@link IndexUpdate}.
+     * @throws IOException
+     */
+    public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+        Collection<? extends ColumnReference> indexedColumns) throws 
IOException {
+        ensureLocalStateInitialized(indexedColumns);
+        // filter out things with a newer timestamp and track the column 
references to which it applies
+        ColumnTracker tracker = new ColumnTracker(indexedColumns);
+        synchronized (this.trackedColumns) {
+            // we haven't seen this set of columns before, so we need to 
create a new tracker
+            if (!this.trackedColumns.contains(tracker)) {
+                this.trackedColumns.add(tracker);
+            }
+        }
+
+        Scanner scanner = 
this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
+
+        return new Pair<Scanner, IndexUpdate>(scanner, new 
IndexUpdate(tracker));
+    }
+
+    /**
+     * Initialize the managed local state. Generally, this will only be called 
by
+     * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be 
called concurrently from the outside. Even
+     * then, there is still fairly low contention as each new Put/Delete will 
have its own table state.
+     */
+    private synchronized void ensureLocalStateInitialized(Collection<? extends 
ColumnReference> columns)
+            throws IOException {
+        // check to see if we haven't initialized any columns yet
+        Collection<? extends ColumnReference> toCover = 
this.columnSet.findNonCoveredColumns(columns);
+        // we have all the columns loaded, so we are good to go.
+        if (toCover.isEmpty()) { return; }
+
+        // add the current state of the row
+        this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), 
false);
+
+        // add the covered columns to the set
+        for (ColumnReference ref : toCover) {
+            this.columnSet.addColumn(ref);
+        }
+    }
+
+    @Override
+    public Map<String, byte[]> getUpdateAttributes() {
+        return this.update.getAttributesMap();
+    }
+
+    @Override
+    public byte[] getCurrentRowKey() {
+        return this.update.getRow();
+    }
+
+    public Result getCurrentRowState() {
+        KeyValueScanner scanner = this.memstore.getScanner();
+        List<Cell> kvs = new ArrayList<Cell>();
+        while (scanner.peek() != null) {
+            try {
+                kvs.add(scanner.next());
+            } catch (IOException e) {
+                // this should never happen - something has gone terribly arwy 
if it has
+                throw new RuntimeException("Local MemStore threw 
IOException!");
+            }
+        }
+        return Result.create(kvs);
+    }
+
+    /**
+     * Helper to add a {@link Mutation} to the values stored for the current 
row
+     * 
+     * @param pendingUpdate
+     *            update to apply
+     */
+    public void addUpdateForTesting(Mutation pendingUpdate) {
+        for (Map.Entry<byte[], List<Cell>> e : 
pendingUpdate.getFamilyCellMap().entrySet()) {
+               List<KeyValue> edits = 
KeyValueUtil.ensureKeyValues(e.getValue());
+            addUpdate(edits);
+        }
+    }
+
+    /**
+     * @param hints
+     */
+    public void setHints(List<? extends IndexedColumnGroup> hints) {
+        this.hints = hints;
+    }
+
+    @Override
+    public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+        return this.hints;
+    }
+
+    @Override
+    public Collection<KeyValue> getPendingUpdate() {
+        return this.kvs;
+    }
+
+    /**
+     * Set the {@link KeyValue}s in the update for which we are currently 
building an index update, but don't actually
+     * apply them.
+     * 
+     * @param update
+     *            pending {@link KeyValue}s
+     */
+    public void setPendingUpdates(Collection<KeyValue> update) {
+        this.kvs.clear();
+        this.kvs.addAll(update);
+    }
+
+    /**
+     * Apply the {@link KeyValue}s set in {@link 
#setPendingUpdates(Collection)}.
+     */
+    public void applyPendingUpdates() {
+        this.addUpdate(kvs);
+    }
+
+    /**
+     * Rollback all the given values from the underlying state.
+     * 
+     * @param values
+     */
+    public void rollback(Collection<KeyValue> values) {
+        for (KeyValue kv : values) {
+            this.memstore.rollback(kv);
+        }
+    }
+
+    @Override
+    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? 
extends ColumnReference> indexedColumns)
+            throws IOException {
+        Pair<Scanner, IndexUpdate> pair = 
getIndexedColumnsTableState(indexedColumns);
+        ValueGetter valueGetter = 
IndexManagementUtil.createGetterFromScanner(pair.getFirst(), 
getCurrentRowKey());
+        return new Pair<ValueGetter, IndexUpdate>(valueGetter, 
pair.getSecond());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
new file mode 100644
index 0000000..27af40f
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+/**
+ * Build covered indexes for phoenix updates.
+ * <p>
+ * Before any call to prePut/preDelete, the row has already been locked. This 
ensures that we don't need to do any extra
+ * synchronization in the IndexBuilder.
+ * <p>
+ * NOTE: This implementation doesn't cleanup the index when we remove a 
key-value on compaction or flush, leading to a
+ * bloated index that needs to be cleaned up by a background process.
+ */
+public class NonTxIndexBuilder extends BaseIndexBuilder {
+    private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class);
+
+    protected LocalHBaseState localTable;
+
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        this.localTable = new LocalTable(env);
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation 
mutation, IndexMetaData indexMetaData) throws IOException {
+       // create a state manager, so we can manage each batch
+        LocalTableState state = new LocalTableState(env, localTable, mutation);
+        // build the index updates for each group
+        IndexUpdateManager manager = new IndexUpdateManager();
+
+        batchMutationAndAddUpdates(manager, state, mutation, indexMetaData);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Found index updates for Mutation: " + mutation + "\n" + 
manager);
+        }
+
+        return manager.toMap();
+    }
+
+    /**
+     * Split the mutation into batches based on the timestamps of each 
keyvalue. We need to check each key-value in the
+     * update to see if it matches the others. Generally, this will be the 
case, but you can add kvs to a mutation that
+     * don't all have the timestamp, so we need to manage everything in 
batches based on timestamp.
+     * <p>
+     * Adds all the updates in the {@link Mutation} to the state, as a 
side-effect.
+     * @param state
+     *            current state of the row for the mutation.
+     * @param m
+     *            mutation to batch
+     * @param indexMetaData TODO
+     * @param updateMap
+     *            index updates into which to add new updates. Modified as a 
side-effect.
+     * 
+     * @throws IOException
+     */
+    private void batchMutationAndAddUpdates(IndexUpdateManager manager, 
LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws 
IOException {
+        // split the mutation into timestamp-based batches
+        Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+
+        // go through each batch of keyvalues and build separate index entries 
for each
+        boolean cleanupCurrentState = true;
+        for (Batch batch : batches) {
+            /*
+             * We have to split the work between the cleanup and the update 
for each group because when we update the
+             * current state of the row for the current batch (appending the 
mutations for the current batch) the next
+             * group will see that as the current state, which will can cause 
the a delete and a put to be created for
+             * the next group.
+             */
+            if (addMutationsForBatch(manager, batch, state, 
cleanupCurrentState, indexMetaData)) {
+                cleanupCurrentState = false;
+            }
+        }
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. 
Updates any {@link KeyValue} with a timestamp
+     * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the 
method is called.
+     * 
+     * @param m
+     *            {@link Mutation} from which to extract the {@link KeyValue}s
+     * @return the mutation, broken into batches and sorted in ascending order 
(smallest first)
+     */
+    protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) 
{
+        Map<Long, Batch> batches = new HashMap<Long, Batch>();
+        for (List<Cell> family : m.getFamilyCellMap().values()) {
+            List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+            createTimestampBatchesFromKeyValues(familyKVs, batches);
+        }
+        // sort the batches
+        List<Batch> sorted = new ArrayList<Batch>(batches.values());
+        Collections.sort(sorted, new Comparator<Batch>() {
+            @Override
+            public int compare(Batch o1, Batch o2) {
+                return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+            }
+        });
+        return sorted;
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. 
Updates any {@link KeyValue} with a
+     * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at 
the time the method is called.
+     * 
+     * @param kvs
+     *            {@link KeyValue}s to break into batches
+     * @param batches
+     *            to update with the given kvs
+     */
+    protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> 
kvs, Map<Long, Batch> batches) {
+        long now = EnvironmentEdgeManager.currentTime();
+        byte[] nowBytes = Bytes.toBytes(now);
+
+        // batch kvs by timestamp
+        for (KeyValue kv : kvs) {
+            long ts = kv.getTimestamp();
+            // override the timestamp to the current time, so the index and 
primary tables match
+            // all the keys with LATEST_TIMESTAMP will then be put into the 
same batch
+            if (kv.updateLatestStamp(nowBytes)) {
+                ts = now;
+            }
+            Batch batch = batches.get(ts);
+            if (batch == null) {
+                batch = new Batch(ts);
+                batches.put(ts, batch);
+            }
+            batch.add(kv);
+        }
+    }
+
+    /**
+     * For a single batch, get all the index updates and add them to the 
updateMap
+     * <p>
+     * This method manages cleaning up the entire history of the row from the 
given timestamp forward for out-of-order
+     * (e.g. 'back in time') updates.
+     * <p>
+     * If things arrive out of order (client is using custom timestamps) we 
should still see the index in the correct
+     * order (assuming we scan after the out-of-order update in finished). 
Therefore, we when we aren't the most recent
+     * update to the index, we need to delete the state at the current 
timestamp (similar to above), but also issue a
+     * delete for the added index updates at the next newest timestamp of any 
of the columns in the update; we need to
+     * cleanup the insert so it looks like it was also deleted at that next 
newest timestamp. However, its not enough to
+     * just update the one in front of us - that column will likely be applied 
to index entries up the entire history in
+     * front of us, which also needs to be fixed up.
+     * <p>
+     * However, the current update usually will be the most recent thing to be 
added. In that case, all we need to is
+     * issue a delete for the previous index row (the state of the row, 
without the update applied) at the current
+     * timestamp. This gets rid of anything currently in the index for the 
current state of the row (at the timestamp).
+     * Then we can just follow that by applying the pending update and 
building the index update based on the new row
+     * state.
+     * 
+     * @param updateMap
+     *            map to update with new index elements
+     * @param batch
+     *            timestamp-based batch of edits
+     * @param state
+     *            local state to update and pass to the codec
+     * @param requireCurrentStateCleanup
+     *            <tt>true</tt> if we should should attempt to cleanup the 
current state of the table, in the event of a
+     *            'back in time' batch. <tt>false</tt> indicates we should not 
attempt the cleanup, e.g. an earlier
+     *            batch already did the cleanup.
+     * @param indexMetaData TODO
+     * @return <tt>true</tt> if we cleaned up the current state forward (had a 
back-in-time put), <tt>false</tt>
+     *         otherwise
+     * @throws IOException
+     */
+    private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch 
batch, LocalTableState state,
+            boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) 
throws IOException {
+
+        // need a temporary manager for the current batch. It should resolve 
any conflicts for the
+        // current batch. Essentially, we can get the case where a batch 
doesn't change the current
+        // state of the index (all Puts are covered by deletes), in which case 
we don't want to add
+        // anything
+        // A. Get the correct values for the pending state in the batch
+        // A.1 start by cleaning up the current state - as long as there are 
key-values in the batch
+        // that are indexed, we need to change the current state of the index. 
Its up to the codec to
+        // determine if we need to make any cleanup given the pending update.
+        long batchTs = batch.getTimestamp();
+        state.setPendingUpdates(batch.getKvs());
+        addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
+
+        // A.2 do a single pass first for the updates to the current state
+        state.applyPendingUpdates();
+        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, 
indexMetaData);
+        // if all the updates are the latest thing in the index, we are done - 
don't go and fix history
+        if (ColumnTracker.isNewestTime(minTs)) { return false; }
+
+        // A.3 otherwise, we need to roll up through the current state and get 
the 'correct' view of the
+        // index. after this, we have the correct view of the index, from the 
batch up to the index
+        while (!ColumnTracker.isNewestTime(minTs)) {
+            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, 
indexMetaData);
+        }
+
+        // B. only cleanup the current state if we need to - its a huge waste 
of effort otherwise.
+        if (requireCurrentStateCleanup) {
+            // roll back the pending update. This is needed so we can remove 
all the 'old' index entries.
+            // We don't need to do the puts here, but just the deletes at the 
given timestamps since we
+            // just want to completely hide the incorrect entries.
+            state.rollback(batch.getKvs());
+            // setup state
+            state.setPendingUpdates(batch.getKvs());
+
+            // cleanup the pending batch. If anything in the correct history 
is covered by Deletes used to
+            // 'fix' history (same row key and ts), we just drop the delete 
(we don't want to drop both
+            // because the update may have a different set of columns or value 
based on the update).
+            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, 
indexMetaData);
+
+            // have to roll the state forward again, so the current state is 
correct
+            state.applyPendingUpdates();
+            return true;
+        }
+        return false;
+    }
+
+    private long addUpdateForGivenTimestamp(long ts, LocalTableState state, 
IndexUpdateManager updateMap, IndexMetaData indexMetaData)
+            throws IOException {
+        state.setCurrentTimestamp(ts);
+        ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData);
+        return ts;
+    }
+
+    private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long 
batchTs, LocalTableState state, IndexMetaData indexMetaData)
+            throws IOException {
+        // get the cleanup for the current state
+        state.setCurrentTimestamp(batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
+        // ignore any index tracking from the delete
+        state.resetTrackedColumns();
+    }
+
+    /**
+     * Add the necessary mutations for the pending batch on the local state. 
Handles rolling up through history to
+     * determine the index changes after applying the batch (for the case 
where the batch is back in time).
+     * 
+     * @param updateMap
+     *            to update with index mutations
+     * @param state
+     *            current state of the table
+     * @param indexMetaData TODO
+     * @param batch
+     *            to apply to the current state
+     * @return the minimum timestamp across all index columns requested. If 
{@link ColumnTracker#isNewestTime(long)}
+     *         returns <tt>true</tt> on the returned timestamp, we know that 
this <i>was not a back-in-time update</i>.
+     * @throws IOException
+     */
+    private long addCurrentStateMutationsForBatch(IndexUpdateManager 
updateMap, LocalTableState state, IndexMetaData indexMetaData)
+            throws IOException {
+
+        // get the index updates for this current batch
+        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, 
indexMetaData);
+        state.resetTrackedColumns();
+
+        /*
+         * go through all the pending updates. If we are sure that all the 
entries are the latest timestamp, we can just
+         * add the index updates and move on. However, if there are columns 
that we skip past (based on the timestamp of
+         * the batch), we need to roll back up the history. Regardless of 
whether or not they are the latest timestamp,
+         * the entries here are going to be correct for the current batch 
timestamp, so we add them to the updates. The
+         * only thing we really care about it if we need to roll up the 
history and fix it as we go.
+         */
+        // timestamp of the next update we need to track
+        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+        List<IndexedColumnGroup> columnHints = new 
ArrayList<IndexedColumnGroup>();
+        for (IndexUpdate update : upserts) {
+            // this is the one bit where we check the timestamps
+            final ColumnTracker tracker = update.getIndexedColumns();
+            long trackerTs = tracker.getTS();
+            // update the next min TS we need to track
+            if (trackerTs < minTs) {
+                minTs = tracker.getTS();
+            }
+            // track index hints for the next round. Hint if we need an update 
for that column for the
+            // next timestamp. These columns clearly won't need to update as 
we go through time as they
+            // already match the most recent possible thing.
+            boolean needsCleanup = false;
+            if (tracker.hasNewerTimestamps()) {
+                columnHints.add(tracker);
+                // this update also needs to be cleaned up at the next 
timestamp because it not the latest.
+                needsCleanup = true;
+            }
+
+            // only make the put if the index update has been setup
+            if (update.isValid()) {
+                byte[] table = update.getTableName();
+                Mutation mutation = update.getUpdate();
+                updateMap.addIndexUpdate(table, mutation);
+
+                // only make the cleanup if we made a put and need cleanup
+                if (needsCleanup) {
+                    // there is a TS for the interested columns that is 
greater than the columns in the
+                    // put. Therefore, we need to issue a delete at the same 
timestamp
+                    Delete d = new Delete(mutation.getRow());
+                    d.setTimestamp(tracker.getTS());
+                    updateMap.addIndexUpdate(table, d);
+                }
+            }
+        }
+        return minTs;
+    }
+
+    /**
+     * Cleanup the index based on the current state from the given batch. 
Iterates over each timestamp (for the indexed
+     * rows) for the current state of the table and cleans up all the existing 
entries generated by the codec.
+     * <p>
+     * Adds all pending updates to the updateMap
+     * 
+     * @param updateMap
+     *            updated with the pending index updates from the codec
+     * @param batchTs
+     *            timestamp from which we should cleanup
+     * @param state
+     *            current state of the primary table. Should already by setup 
to the correct state from which we want to
+     *            cleanup.
+     * @param indexMetaData TODO
+     * @throws IOException
+     */
+    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager 
updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
+            throws IOException {
+        // get the cleanup for the current state
+        state.setCurrentTimestamp(batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
+        Set<ColumnTracker> trackers = state.getTrackedColumns();
+        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+        for (ColumnTracker tracker : trackers) {
+            if (tracker.getTS() < minTs) {
+                minTs = tracker.getTS();
+            }
+        }
+        state.resetTrackedColumns();
+        if (!ColumnTracker.isNewestTime(minTs)) {
+            state.setHints(Lists.newArrayList(trackers));
+            cleanupIndexStateFromBatchOnward(updateMap, minTs, state, 
indexMetaData);
+        }
+    }
+
+    /**
+     * Get the index deletes from the codec {@link 
IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
+     * update map.
+     * <p>
+     * Expects the {@link LocalTableState} to already be correctly setup 
(correct timestamp, updates applied, etc).
+     * @param indexMetaData TODO
+     * 
+     * @throws IOException
+     */
+    protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, 
LocalTableState state, long ts, IndexMetaData indexMetaData)
+            throws IOException {
+        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, 
indexMetaData);
+        if (cleanup != null) {
+            for (IndexUpdate d : cleanup) {
+                if (!d.isValid()) {
+                    continue;
+                }
+                // override the timestamps in the delete to match the current 
batch.
+                Delete remove = (Delete)d.getUpdate();
+                remove.setTimestamp(ts);
+                updateMap.addIndexUpdate(d.getTableName(), remove);
+            }
+        }
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> 
getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData 
indexMetaData)
+            throws IOException {
+        // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 4c4d0b0..0e961db 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,14 +23,14 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
 
 /**
  * Interface for the current state of the table. This is generally going to be 
as of a timestamp - a
@@ -52,46 +52,14 @@ public interface TableState {
   public long getCurrentTimestamp();
 
   /**
-   * Set the current timestamp up to which the table should allow access to 
the underlying table.
-   * This overrides the timestamp view provided by the indexer - use with care!
-   * @param timestamp timestamp up to which the table should allow access.
-   */
-  public void setCurrentTimestamp(long timestamp);
-
-  /**
    * @return the attributes attached to the current update (e.g. {@link 
Mutation}).
    */
   public Map<String, byte[]> getUpdateAttributes();
 
   /**
-   * Get a scanner on the columns that are needed by the index.
-   * <p>
-   * The returned scanner is already pre-seeked to the first {@link KeyValue} 
that matches the given
-   * columns with a timestamp earlier than the timestamp to which the table is 
currently set (the
-   * current state of the table for which we need to build an update).
-   * <p>
-   * If none of the passed columns matches any of the columns in the pending 
update (as determined
-   * by {@link ColumnReference#matchesFamily(byte[])} and
-   * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner 
will be returned. This
-   * is because it doesn't make sense to build index updates when there is no 
change in the table
-   * state for any of the columns you are indexing.
-   * <p>
-   * <i>NOTE:</i> This method should <b>not</b> be used during
-   * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will 
not yet have been
-   * applied - you are merely attempting to cleanup the current state and 
therefore do <i>not</i>
-   * need to track the indexed columns.
-   * <p>
-   * As a side-effect, we update a timestamp for the next-most-recent 
timestamp for the columns you
-   * request - you will never see a column with the timestamp we are tracking, 
but the next oldest
-   * timestamp for that column.
-   * @param indexedColumns the columns to that will be indexed
-   * @return an iterator over the columns and the {@link IndexUpdate} that 
should be passed back to
-   *         the builder. Even if no update is necessary for the requested 
columns, you still need
-   *         to return the {@link IndexUpdate}, just don't set the update for 
the
-   *         {@link IndexUpdate}.
-   * @throws IOException
+   * Get a getter interface for the state of the index row
    */
-  Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+  Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
       Collection<? extends ColumnReference> indexedColumns) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 7fbc7f3..52076a2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -24,7 +24,8 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
@@ -37,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 public class LazyValueGetter implements ValueGetter {
 
   private Scanner scan;
-  private volatile Map<ColumnReference, ImmutableBytesPtr> values;
+  private volatile Map<ColumnReference, ImmutableBytesWritable> values;
   private byte[] row;
   
   /**
@@ -51,16 +52,16 @@ public class LazyValueGetter implements ValueGetter {
   }
 
   @Override
-  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws 
IOException {
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws 
IOException {
     // ensure we have a backing map
     if (values == null) {
       synchronized (this) {
-        values = Collections.synchronizedMap(new HashMap<ColumnReference, 
ImmutableBytesPtr>());
+        values = Collections.synchronizedMap(new HashMap<ColumnReference, 
ImmutableBytesWritable>());
       }
     }
 
     // check the value in the map
-    ImmutableBytesPtr value = values.get(ref);
+    ImmutableBytesWritable value = values.get(ref);
     if (value == null) {
       value = get(ref);
       values.put(ref, value);
@@ -80,7 +81,7 @@ public class LazyValueGetter implements ValueGetter {
     }
     // there is a next value - we only care about the current value, so we can 
just snag that
     Cell next = scan.next();
-    if (ref.matches(next)) {
+    if (ref.matches(KeyValueUtil.ensureKeyValue(next))) {
       return new ImmutableBytesPtr(next.getValueArray(), 
next.getValueOffset(), next.getValueLength());
     }
     return null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 4b953e6..4efca9f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -1,19 +1,11 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
@@ -32,337 +24,341 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
-import org.apache.phoenix.index.BaseIndexCodec;
+
+import com.google.common.collect.Lists;
 
 /**
  *
  */
 public class CoveredColumnIndexCodec extends BaseIndexCodec {
 
-  private static final byte[] EMPTY_BYTES = new byte[0];
-  public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    public static final byte[] INDEX_ROW_COLUMN_FAMILY = 
Bytes.toBytes("INDEXED_COLUMNS");
 
-  private List<ColumnGroup> groups;
+    private List<ColumnGroup> groups;
 
-  /**
-   * @param groups to initialize the codec with
-   * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing
-   *         purposes
-   */
-  public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
-    CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
-    codec.groups = Lists.newArrayList(groups);
-    return codec;
-  }
-
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) {
-    groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
-  }
-
-  @Override
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
-    List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
-    for (ColumnGroup group : groups) {
-      IndexUpdate update = getIndexUpdateForGroup(group, state);
-      updates.add(update);
+    /**
+     * @param groups
+     *            to initialize the codec with
+     * @return an instance that is initialized with the given {@link 
ColumnGroup}s, for testing purposes
+     */
+    public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> 
groups) {
+        CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
+        codec.groups = Lists.newArrayList(groups);
+        return codec;
     }
-    return updates;
-  }
 
-  /**
-   * @param group
-   * @param state
-   * @return the update that should be made to the table
-   */
-  private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state) {
-    List<CoveredColumn> refs = group.getColumns();
-    try {
-      Pair<Scanner, IndexUpdate> stateInfo = 
state.getIndexedColumnsTableState(refs);
-      Scanner kvs = stateInfo.getFirst();
-      Pair<Integer, List<ColumnEntry>> columns =
-          getNextEntries(refs, kvs, state.getCurrentRowKey());
-      // make sure we close the scanner
-      kvs.close();
-      if (columns.getFirst().intValue() == 0) {
-        return stateInfo.getSecond();
-      }
-      // have all the column entries, so just turn it into a Delete for the row
-      // convert the entries to the needed values
-      byte[] rowKey =
-          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), 
columns.getSecond());
-      Put p = new Put(rowKey, state.getCurrentTimestamp());
-      // add the columns to the put
-      addColumnsToPut(p, columns.getSecond());
-
-      // update the index info
-      IndexUpdate update = stateInfo.getSecond();
-      update.setTable(Bytes.toBytes(group.getTable()));
-      update.setUpdate(p);
-      return update;
-    } catch (IOException e) {
-      throw new RuntimeException("Unexpected exception when getting state for 
columns: " + refs);
+    @Override
+    public void initialize(RegionCoprocessorEnvironment env) {
+        groups = 
CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
     }
-  }
 
-  private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
-    // add each of the corresponding families to the put
-    int count = 0;
-    for (ColumnEntry column : columns) {
-      indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
-        ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
-    }
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, 
IndexMetaData context) {
+        List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+        for (ColumnGroup group : groups) {
+            IndexUpdate update = getIndexUpdateForGroup(group, state);
+            updates.add(update);
+        }
+        return updates;
   }
 
-  private static byte[] toIndexQualifier(CoveredColumn column) {
-    return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR),
-      column.getQualifier());
-  }
+    /**
+     * @param group
+     * @param state
+     * @return the update that should be made to the table
+     */
+    private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState 
state) {
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<Scanner, IndexUpdate> stateInfo = 
((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Scanner kvs = stateInfo.getFirst();
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs, state.getCurrentRowKey());
+            // make sure we close the scanner
+            kvs.close();
+            if (columns.getFirst().intValue() == 0) { return 
stateInfo.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for 
the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
+            Put p = new Put(rowKey, state.getCurrentTimestamp());
+            // add the columns to the put
+            addColumnsToPut(p, columns.getSecond());
+
+            // update the index info
+            IndexUpdate update = stateInfo.getSecond();
+            update.setTable(Bytes.toBytes(group.getTable()));
+            update.setUpdate(p);
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
+        }
+    }
 
-  @Override
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
-    List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
-    for (ColumnGroup group : groups) {
-      deletes.add(getDeleteForGroup(group, state));
+    private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> 
columns) {
+        // add each of the corresponding families to the put
+        int count = 0;
+        for (ColumnEntry column : columns) {
+            indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
+                    ArrayUtils.addAll(Bytes.toBytes(count++), 
toIndexQualifier(column.ref)), null);
+        }
     }
-    return deletes;
-  }
 
+    private static byte[] toIndexQualifier(CoveredColumn column) {
+        return ArrayUtils.addAll(Bytes.toBytes(column.familyString + 
CoveredColumn.SEPARATOR), column.getQualifier());
+    }
 
-  /**
-   * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table
-   * for a given index.
-   * @param group index information
-   * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
-   */
-  private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
-    List<CoveredColumn> refs = group.getColumns();
-    try {
-      Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
-      Pair<Integer, List<ColumnEntry>> columns =
-          getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
-      // make sure we close the scanner reference
-      kvs.getFirst().close();
-      // no change, just return the passed update
-      if (columns.getFirst() == 0) {
-        return kvs.getSecond();
-      }
-      // have all the column entries, so just turn it into a Delete for the row
-      // convert the entries to the needed values
-      byte[] rowKey =
-          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), 
columns.getSecond());
-      Delete d = new Delete(rowKey);
-      d.setTimestamp(state.getCurrentTimestamp());
-      IndexUpdate update = kvs.getSecond();
-      update.setUpdate(d);
-      update.setTable(Bytes.toBytes(group.getTable()));
-      return update;
-    } catch (IOException e) {
-      throw new RuntimeException("Unexpected exception when getting state for 
columns: " + refs);
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, 
IndexMetaData context) {
+        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+        for (ColumnGroup group : groups) {
+            deletes.add(getDeleteForGroup(group, state));
+        }
+        return deletes;
     }
-  }
 
-  /**
-   * Get the next batch of primary table values for the given columns
-   * @param refs columns to match against
-   * @param kvs
-   * @param currentRow
-   * @return the total length of all values found and the entries to add for 
the index
-   */
-  private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> 
refs, Scanner kvs,
-      byte[] currentRow) throws IOException {
-    int totalValueLength = 0;
-    List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
-
-    // pull out the latest state for each column reference, in order
-    for (CoveredColumn ref : refs) {
-      KeyValue first = ref.getFirstKeyValueForRow(currentRow);
-      if (!kvs.seek(first)) {
-        // no more keys, so add a null value
-        entries.add(new ColumnEntry(null, ref));
-        continue;
-      }
-      // there is a next value - we only care about the current value, so we 
can just snag that
-      Cell next = kvs.next();
-      if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
-        byte[] v = next.getValue();
-        totalValueLength += v.length;
-        entries.add(new ColumnEntry(v, ref));
-      } else {
-        // this first one didn't match at all, so we have to put in a null 
entry
-        entries.add(new ColumnEntry(null, ref));
-        continue;
-      }
-      // here's where is gets a little tricky - we either need to decide if we 
should continue
-      // adding entries (matches all qualifiers) or if we are done (matches a 
single qualifier)
-      if (!ref.allColumns()) {
-        continue;
-      }
-      // matches all columns, so we need to iterate until we hit the next 
column with the same
-      // family as the current key
-      byte[] lastQual = next.getQualifier();
-      byte[] nextQual = null;
-      while ((next = kvs.next()) != null) {
-        // different family, done with this column
-        if (!ref.matchesFamily(next.getFamily())) {
-          break;
+    /**
+     * Get all the deletes necessary for a group of columns - logically, the 
cleanup the index table for a given index.
+     * 
+     * @param group
+     *            index information
+     * @return the cleanup for the given index, or <tt>null</tt> if no cleanup 
is necessary
+     */
+    private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) 
{
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<Scanner, IndexUpdate> kvs = 
((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, 
kvs.getFirst(), state.getCurrentRowKey());
+            // make sure we close the scanner reference
+            kvs.getFirst().close();
+            // no change, just return the passed update
+            if (columns.getFirst() == 0) { return kvs.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for 
the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), 
columns.getFirst(), columns.getSecond());
+            Delete d = new Delete(rowKey);
+            d.setTimestamp(state.getCurrentTimestamp());
+            IndexUpdate update = kvs.getSecond();
+            update.setUpdate(d);
+            update.setTable(Bytes.toBytes(group.getTable()));
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting 
state for columns: " + refs);
         }
-        nextQual = next.getQualifier();
-        // we are still on the same qualifier - skip it, since we already 
added a column for it
-        if (Arrays.equals(lastQual, nextQual)) {
-          continue;
+    }
+
+    /**
+     * Get the next batch of primary table values for the given columns
+     * 
+     * @param refs
+     *            columns to match against
+     * @param state
+     * @return the total length of all values found and the entries to add for 
the index
+     */
+    private Pair<Integer, List<ColumnEntry>> 
getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
+            throws IOException {
+        int totalValueLength = 0;
+        List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
+
+        // pull out the latest state for each column reference, in order
+        for (CoveredColumn ref : refs) {
+            KeyValue first = ref.getFirstKeyValueForRow(currentRow);
+            if (!kvs.seek(first)) {
+                // no more keys, so add a null value
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // there is a next value - we only care about the current value, 
so we can just snag that
+            Cell next = kvs.next();
+            if (ref.matchesFamily(next.getFamily()) && 
ref.matchesQualifier(next.getQualifier())) {
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+            } else {
+                // this first one didn't match at all, so we have to put in a 
null entry
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // here's where is gets a little tricky - we either need to decide 
if we should continue
+            // adding entries (matches all qualifiers) or if we are done 
(matches a single qualifier)
+            if (!ref.allColumns()) {
+                continue;
+            }
+            // matches all columns, so we need to iterate until we hit the 
next column with the same
+            // family as the current key
+            byte[] lastQual = next.getQualifier();
+            byte[] nextQual = null;
+            while ((next = kvs.next()) != null) {
+                // different family, done with this column
+                if (!ref.matchesFamily(next.getFamily())) {
+                    break;
+                }
+                nextQual = next.getQualifier();
+                // we are still on the same qualifier - skip it, since we 
already added a column for it
+                if (Arrays.equals(lastQual, nextQual)) {
+                    continue;
+                }
+                // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+                // update the last qualifier to check against
+                lastQual = nextQual;
+            }
         }
-        // this must match the qualifier since its an all-qualifiers 
specifier, so we add it
-        byte[] v = next.getValue();
-        totalValueLength += v.length;
-        entries.add(new ColumnEntry(v, ref));
-        // update the last qualifier to check against
-        lastQual = nextQual;
-      }
+        return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
     }
-    return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
-  }
 
-  static class ColumnEntry {
-    byte[] value = EMPTY_BYTES;
-    CoveredColumn ref;
+    static class ColumnEntry {
+        byte[] value = EMPTY_BYTES;
+        CoveredColumn ref;
 
-    public ColumnEntry(byte[] value, CoveredColumn ref) {
-      this.value = value == null ? EMPTY_BYTES : value;
-      this.ref = ref;
+        public ColumnEntry(byte[] value, CoveredColumn ref) {
+            this.value = value == null ? EMPTY_BYTES : value;
+            this.ref = ref;
+        }
     }
-  }
 
-  /**
-   * Compose the final index row key.
-   * <p>
-   * This is faster than adding each value independently as we can just build 
a single a array and
-   * copy everything over once.
-   * @param pk primary key of the original row
-   * @param length total number of bytes of all the values that should be added
-   * @param values to use when building the key
-   */
-  static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) 
{
-    // now build up expected row key, each of the values, in order, followed 
by the PK and then some
-    // info about lengths so we can deserialize each value
-    byte[] output = new byte[length + pk.length];
-    int pos = 0;
-    int[] lengths = new int[values.size()];
-    int i = 0;
-    for (ColumnEntry entry : values) {
-      byte[] v = entry.value;
-      // skip doing the copy attempt, if we don't need to
-      if (v.length != 0) {
-        System.arraycopy(v, 0, output, pos, v.length);
-        pos += v.length;
-      }
-      lengths[i++] = v.length;
-    }
+    /**
+     * Compose the final index row key.
+     * <p>
+     * This is faster than adding each value independently as we can just 
build a single a array and copy everything
+     * over once.
+     * 
+     * @param pk
+     *            primary key of the original row
+     * @param length
+     *            total number of bytes of all the values that should be added
+     * @param values
+     *            to use when building the key
+     */
+    static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> 
values) {
+        // now build up expected row key, each of the values, in order, 
followed by the PK and then some
+        // info about lengths so we can deserialize each value
+        byte[] output = new byte[length + pk.length];
+        int pos = 0;
+        int[] lengths = new int[values.size()];
+        int i = 0;
+        for (ColumnEntry entry : values) {
+            byte[] v = entry.value;
+            // skip doing the copy attempt, if we don't need to
+            if (v.length != 0) {
+                System.arraycopy(v, 0, output, pos, v.length);
+                pos += v.length;
+            }
+            lengths[i++] = v.length;
+        }
+
+        // add the primary key to the end of the row key
+        System.arraycopy(pk, 0, output, pos, pk.length);
 
-    // add the primary key to the end of the row key
-    System.arraycopy(pk, 0, output, pos, pk.length);
+        // add the lengths as suffixes so we can deserialize the elements again
+        for (int l : lengths) {
+            output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+        }
 
-    // add the lengths as suffixes so we can deserialize the elements again
-    for (int l : lengths) {
-      output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+        // and the last integer is the number of values
+        return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
     }
 
-    // and the last integer is the number of values
-    return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
-  }
+    /**
+     * Essentially a short-cut from building a {@link Put}.
+     * 
+     * @param pk
+     *            row key
+     * @param timestamp
+     *            timestamp of all the keyvalues
+     * @param values
+     *            expected value--column pair
+     * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value -- column pairs.
+     */
+    public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
+            List<Pair<byte[], CoveredColumn>> values) {
+
+        int length = 0;
+        List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
+        for (Pair<byte[], CoveredColumn> value : values) {
+            ColumnEntry entry = new ColumnEntry(value.getFirst(), 
value.getSecond());
+            length += value.getFirst().length;
+            expected.add(entry);
+        }
 
-  /**
-   * Essentially a short-cut from building a {@link Put}.
-   * @param pk row key
-   * @param timestamp timestamp of all the keyvalues
-   * @param values expected value--column pair
-   * @return a keyvalues that the index contains for a given row at a 
timestamp with the given value
-   *         -- column pairs.
-   */
-  public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long 
timestamp,
-      List<Pair<byte[], CoveredColumn>> values) {
-  
-    int length = 0;
-    List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
-    for (Pair<byte[], CoveredColumn> value : values) {
-      ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
-      length += value.getFirst().length;
-      expected.add(entry);
-    }
-  
-    byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
-    Put p = new Put(rowKey, timestamp);
-    CoveredColumnIndexCodec.addColumnsToPut(p, expected);
-    List<KeyValue> kvs = new ArrayList<KeyValue>();
-    for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
-      kvs.addAll(entry.getValue());
-    }
-  
-    return kvs;
-  }
+        byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, 
expected);
+        Put p = new Put(rowKey, timestamp);
+        CoveredColumnIndexCodec.addColumnsToPut(p, expected);
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        for (Entry<byte[], List<KeyValue>> entry : 
p.getFamilyMap().entrySet()) {
+            kvs.addAll(entry.getValue());
+        }
 
-  public static List<byte[]> getValues(byte[] bytes) {
-    // get the total number of keys in the bytes
-    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
-    List<byte[]> keys = new ArrayList<byte[]>(keyCount);
-    int[] lengths = new int[keyCount];
-    int lengthPos = keyCount - 1;
-    int pos = bytes.length - Bytes.SIZEOF_INT;
-    // figure out the length of each key
-    for (int i = 0; i < keyCount; i++) {
-      lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
pos);
-      pos -= Bytes.SIZEOF_INT;
+        return kvs;
     }
 
-    int current = 0;
-    for (int length : lengths) {
-      byte[] key = Arrays.copyOfRange(bytes, current, current + length);
-      keys.add(key);
-      current += length;
-    }
+    public static List<byte[]> getValues(byte[] bytes) {
+        // get the total number of keys in the bytes
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
+        List<byte[]> keys = new ArrayList<byte[]>(keyCount);
+        int[] lengths = new int[keyCount];
+        int lengthPos = keyCount - 1;
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        // figure out the length of each key
+        for (int i = 0; i < keyCount; i++) {
+            lengths[lengthPos--] = 
CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            pos -= Bytes.SIZEOF_INT;
+        }
 
-    return keys;
-  }
+        int current = 0;
+        for (int length : lengths) {
+            byte[] key = Arrays.copyOfRange(bytes, current, current + length);
+            keys.add(key);
+            current += length;
+        }
 
-  /**
-   * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
-   * @param bytes array to read from
-   * @param start start point, backwards from which to read. For example, if 
specifying "25", we
-   *          would try to read an integer from 21 -> 25
-   * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
-   */
-  private static int getPreviousInteger(byte[] bytes, int start) {
-    return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
-  }
+        return keys;
+    }
 
-  /**
-   * Check to see if an row key just contains a list of null values.
-   * @param bytes row key to examine
-   * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
-   */
-  public static boolean checkRowKeyForAllNulls(byte[] bytes) {
-    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
-    int pos = bytes.length - Bytes.SIZEOF_INT;
-    for (int i = 0; i < keyCount; i++) {
-      int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
-      if (next > 0) {
-        return false;
-      }
-      pos -= Bytes.SIZEOF_INT;
+    /**
+     * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
+     * 
+     * @param bytes
+     *            array to read from
+     * @param start
+     *            start point, backwards from which to read. For example, if 
specifying "25", we would try to read an
+     *            integer from 21 -> 25
+     * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, 
if it exists.
+     */
+    private static int getPreviousInteger(byte[] bytes, int start) {
+        return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
     }
 
-    return true;
-  }
+    /**
+     * Check to see if an row key just contains a list of null values.
+     * 
+     * @param bytes
+     *            row key to examine
+     * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> 
otherwise
+     */
+    public static boolean checkRowKeyForAllNulls(byte[] bytes) {
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, 
bytes.length);
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        for (int i = 0; i < keyCount; i++) {
+            int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            if (next > 0) { return false; }
+            pos -= Bytes.SIZEOF_INT;
+        }
 
-  @Override
-  public boolean isEnabled(Mutation m) {
-    // this could be a bit smarter, looking at the groups for the mutation, 
but we leave it at this
-    // simple check for the moment.
-    return groups.size() > 0;
-  }
+        return true;
+    }
+
+    @Override
+    public boolean isEnabled(Mutation m) {
+        // this could be a bit smarter, looking at the groups for the 
mutation, but we leave it at this
+        // simple check for the moment.
+        return groups.size() > 0;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
index 6ac89d1..48c714d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 
 /**
@@ -136,7 +136,7 @@ public class CoveredColumnIndexSpecifierBuilder {
   void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws 
IOException {
     // add the codec for the index to the map of options
     Map<String, String> opts = this.convertToMap();
-    opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
+    opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
     Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, 
Coprocessor.PRIORITY_USER);
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index f80cf41..60698c7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -35,8 +36,9 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 
 /**
@@ -90,7 +92,7 @@ import 
org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
  * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of 
the entire row
  * <i>every time there is a write to the table</i>.
  */
-public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
+public class CoveredColumnIndexer extends NonTxIndexBuilder {
 
   /**
    * Create the specified index table with the necessary columns
@@ -117,17 +119,16 @@ public class CoveredColumnIndexer extends 
CoveredColumnsIndexBuilder {
 
   @Override
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered) throws IOException {
-
+      Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws 
IOException {
     // stores all the return values
     IndexUpdateManager updateMap = new IndexUpdateManager();
     // batch the updates by row to make life easier and ordered
     Collection<Batch> batches = batchByRow(filtered);
 
     for (Batch batch : batches) {
-      KeyValue curKV = batch.getKvs().iterator().next();
+      Cell curKV = batch.getKvs().iterator().next();
       Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), 
curKV.getRowLength());
-      for (KeyValue kv : batch.getKvs()) {
+      for (Cell kv : batch.getKvs()) {
         // we only need to cleanup Put entries
         byte type = kv.getTypeByte();
         Type t = KeyValue.Type.codeToType(type);
@@ -145,7 +146,7 @@ public class CoveredColumnIndexer extends 
CoveredColumnsIndexBuilder {
       for (Batch entry : timeBatch) {
         //just set the timestamp on the table - it already has all the future 
state
         state.setCurrentTimestamp(entry.getTimestamp());
-        this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp());
+        this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), 
indexMetaData);
       }
     }
     return updateMap.toMap();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
index b9f3858..7c69493 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.update;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 
@@ -45,7 +44,7 @@ public class ColumnTracker implements IndexedColumnGroup {
   public ColumnTracker(Collection<? extends ColumnReference> columns) {
     this.columns = new ArrayList<ColumnReference>(columns);
     // sort the columns
-    Collections.sort(this.columns);
+    // no need to do this: Collections.sort(this.columns);
     this.hashCode = calcHashCode(this.columns);
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index d09498a..26f620f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
 /**
  * Keeps track of the index updates
  */
@@ -182,8 +181,6 @@ public class IndexUpdateManager {
     for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : 
map.entrySet()) {
       // get is ok because we always set with just the bytes
       byte[] tableName = updates.getKey().get();
-      // TODO replace this as just storing a byte[], to avoid all the String 
<-> byte[] swapping
-      // HBase does
       for (Mutation m : updates.getValue()) {
         // skip elements that have been marked for delete
         if (shouldBeRemoved(m)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
index a28268c..884cca6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.hbase.index.scanner;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 
 
 /**
@@ -29,17 +29,17 @@ import org.apache.hadoop.hbase.KeyValue;
 public class EmptyScanner implements Scanner {
 
   @Override
-  public KeyValue next() throws IOException {
+  public Cell next() throws IOException {
     return null;
   }
 
   @Override
-  public boolean seek(KeyValue next) throws IOException {
+  public boolean seek(Cell next) throws IOException {
     return false;
   }
 
   @Override
-  public KeyValue peek() throws IOException {
+  public Cell peek() throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
index 43ddc45..9454de5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
@@ -44,7 +44,7 @@ public interface Scanner extends Closeable {
    * @return <tt>true</tt> if there are values left in <tt>this</tt>, 
<tt>false</tt> otherwise
    * @throws IOException if there is an error reading the underlying data.
    */
-  public boolean seek(KeyValue next) throws IOException;
+  public boolean seek(Cell next) throws IOException;
 
   /**
    * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' 
it off the top of the

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index ff33ec2..e120268 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.hbase.index.covered.KeyValueStore;
 import 
org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
 import 
org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
@@ -43,6 +41,8 @@ import 
org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
+import com.google.common.collect.Lists;
+
 /**
  *
  */
@@ -59,6 +59,7 @@ public class ScannerBuilder {
 
   public Scanner buildIndexedColumnScanner(Collection<? extends 
ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
 
+    // TODO: This needs to use some form of the filter that Tephra has when 
transactional
     Filter columnFilters = getColumnFilters(indexedColumns);
     FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
 
@@ -136,7 +137,7 @@ public class ScannerBuilder {
       }
 
       @Override
-      public boolean seek(KeyValue next) throws IOException {
+      public boolean seek(Cell next) throws IOException {
         // check to see if the next kv is after the current key, in which case 
we can use reseek,
         // which will be more efficient
         Cell peek = kvScanner.peek();
@@ -144,13 +145,13 @@ public class ScannerBuilder {
         if (peek != null) {
           int compare = KeyValue.COMPARATOR.compare(peek, next);
           if (compare < 0) {
-            return kvScanner.reseek(next);
+            return kvScanner.reseek(KeyValueUtil.ensureKeyValue(next));
           } else if (compare == 0) {
             // we are already at the given key!
             return true;
           }
         }
-        return kvScanner.seek(next);
+        return kvScanner.seek(KeyValueUtil.ensureKeyValue(next));
       }
 
       @Override

Reply via email to