http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index e4bc193..c4ed7a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -1,19 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.covered; @@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.util.Pair; - +import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.data.IndexMemStore; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -42,203 +34,244 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; + +import com.google.inject.Key; /** * Manage the state of the HRegion's view of the table, for the single row. * <p> - * Currently, this is a single-use object - you need to create a new one for each row that you need - * to manage. In the future, we could make this object reusable, but for the moment its easier to - * manage as a throw-away object. + * Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the + * future, we could make this object reusable, but for the moment its easier to manage as a throw-away object. * <p> - * This class is <b>not</b> thread-safe - it requires external synchronization is access - * concurrently. + * This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently. */ public class LocalTableState implements TableState { - private long ts; - private RegionCoprocessorEnvironment env; - private KeyValueStore memstore; - private LocalHBaseState table; - private Mutation update; - private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>(); - private ScannerBuilder scannerBuilder; - private List<KeyValue> kvs = new ArrayList<KeyValue>(); - private List<? extends IndexedColumnGroup> hints; - private CoveredColumns columnSet; - - public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { - this.env = environment; - this.table = table; - this.update = update; - this.memstore = new IndexMemStore(); - this.scannerBuilder = new ScannerBuilder(memstore, update); - this.columnSet = new CoveredColumns(); - } - - public void addPendingUpdates(KeyValue... kvs) { - if (kvs == null) return; - addPendingUpdates(Arrays.asList(kvs)); - } - - public void addPendingUpdates(List<KeyValue> kvs) { - if(kvs == null) return; - setPendingUpdates(kvs); - addUpdate(kvs); - } - - private void addUpdate(List<KeyValue> list) { - addUpdate(list, true); - } - - private void addUpdate(List<KeyValue> list, boolean overwrite) { - if (list == null) return; - for (KeyValue kv : list) { - this.memstore.add(kv, overwrite); - } - } - - @Override - public RegionCoprocessorEnvironment getEnvironment() { - return this.env; - } - - @Override - public long getCurrentTimestamp() { - return this.ts; - } - - @Override - public void setCurrentTimestamp(long timestamp) { - this.ts = timestamp; - } - - public void resetTrackedColumns() { - this.trackedColumns.clear(); - } - - public Set<ColumnTracker> getTrackedColumns() { - return this.trackedColumns; - } - - @Override - public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( - Collection<? extends ColumnReference> indexedColumns) throws IOException { - ensureLocalStateInitialized(indexedColumns); - // filter out things with a newer timestamp and track the column references to which it applies - ColumnTracker tracker = new ColumnTracker(indexedColumns); - synchronized (this.trackedColumns) { - // we haven't seen this set of columns before, so we need to create a new tracker - if (!this.trackedColumns.contains(tracker)) { - this.trackedColumns.add(tracker); - } - } - - Scanner scanner = - this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts); - - return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); - } - - /** - * Initialize the managed local state. Generally, this will only be called by - * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. - * Even then, there is still fairly low contention as each new Put/Delete will have its own table - * state. - */ - private synchronized void ensureLocalStateInitialized( - Collection<? extends ColumnReference> columns) throws IOException { - // check to see if we haven't initialized any columns yet - Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns); - // we have all the columns loaded, so we are good to go. - if (toCover.isEmpty()) { - return; - } - - // add the current state of the row - this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false); - - // add the covered columns to the set - for (ColumnReference ref : toCover) { - this.columnSet.addColumn(ref); - } - } - - @Override - public Map<String, byte[]> getUpdateAttributes() { - return this.update.getAttributesMap(); - } - - @Override - public byte[] getCurrentRowKey() { - return this.update.getRow(); - } - - public Result getCurrentRowState() { - KeyValueScanner scanner = this.memstore.getScanner(); - List<Cell> kvs = new ArrayList<Cell>(); - while (scanner.peek() != null) { - try { - kvs.add(scanner.next()); - } catch (IOException e) { - // this should never happen - something has gone terribly arwy if it has - throw new RuntimeException("Local MemStore threw IOException!"); - } - } - return Result.create(kvs); - } - - /** - * Helper to add a {@link Mutation} to the values stored for the current row - * @param pendingUpdate update to apply - */ - public void addUpdateForTesting(Mutation pendingUpdate) { - for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) { - List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue()); - addUpdate(edits); - } - } - - /** - * @param hints - */ - public void setHints(List<? extends IndexedColumnGroup> hints) { - this.hints = hints; - } - - @Override - public List<? extends IndexedColumnGroup> getIndexColumnHints() { - return this.hints; - } - - @Override - public Collection<KeyValue> getPendingUpdate() { - return this.kvs; - } - - /** - * Set the {@link KeyValue}s in the update for which we are currently building an index update, - * but don't actually apply them. - * @param update pending {@link KeyValue}s - */ - public void setPendingUpdates(Collection<KeyValue> update) { - this.kvs.clear(); - this.kvs.addAll(update); - } - - /** - * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}. - */ - public void applyPendingUpdates() { - this.addUpdate(kvs); - } - - /** - * Rollback all the given values from the underlying state. - * @param values - */ - public void rollback(Collection<KeyValue> values) { - for (KeyValue kv : values) { - this.memstore.rollback(kv); - } - } + private long ts; + private RegionCoprocessorEnvironment env; + private KeyValueStore memstore; + private LocalHBaseState table; + private Mutation update; + private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>(); + private ScannerBuilder scannerBuilder; + private List<KeyValue> kvs = new ArrayList<KeyValue>(); + private List<? extends IndexedColumnGroup> hints; + private CoveredColumns columnSet; + + public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { + this.env = environment; + this.table = table; + this.update = update; + this.memstore = new IndexMemStore(); + this.scannerBuilder = new ScannerBuilder(memstore, update); + this.columnSet = new CoveredColumns(); + } + + public void addPendingUpdates(KeyValue... kvs) { + if (kvs == null) return; + addPendingUpdates(Arrays.asList(kvs)); + } + + public void addPendingUpdates(List<KeyValue> kvs) { + if (kvs == null) return; + setPendingUpdates(kvs); + addUpdate(kvs); + } + + private void addUpdate(List<KeyValue> list) { + addUpdate(list, true); + } + + private void addUpdate(List<KeyValue> list, boolean overwrite) { + if (list == null) return; + for (KeyValue kv : list) { + this.memstore.add(kv, overwrite); + } + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return this.env; + } + + @Override + public long getCurrentTimestamp() { + return this.ts; + } + + /** + * Set the current timestamp up to which the table should allow access to the underlying table. + * This overrides the timestamp view provided by the indexer - use with care! + * @param timestamp timestamp up to which the table should allow access. + */ + public void setCurrentTimestamp(long timestamp) { + this.ts = timestamp; + } + + public void resetTrackedColumns() { + this.trackedColumns.clear(); + } + + public Set<ColumnTracker> getTrackedColumns() { + return this.trackedColumns; + } + + /** + * Get a scanner on the columns that are needed by the index. + * <p> + * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given + * columns with a timestamp earlier than the timestamp to which the table is currently set (the + * current state of the table for which we need to build an update). + * <p> + * If none of the passed columns matches any of the columns in the pending update (as determined + * by {@link ColumnReference#matchesFamily(byte[])} and + * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This + * is because it doesn't make sense to build index updates when there is no change in the table + * state for any of the columns you are indexing. + * <p> + * <i>NOTE:</i> This method should <b>not</b> be used during + * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been + * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> + * need to track the indexed columns. + * <p> + * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you + * request - you will never see a column with the timestamp we are tracking, but the next oldest + * timestamp for that column. + * @param indexedColumns the columns to that will be indexed + * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to + * the builder. Even if no update is necessary for the requested columns, you still need + * to return the {@link IndexUpdate}, just don't set the update for the + * {@link IndexUpdate}. + * @throws IOException + */ + public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( + Collection<? extends ColumnReference> indexedColumns) throws IOException { + ensureLocalStateInitialized(indexedColumns); + // filter out things with a newer timestamp and track the column references to which it applies + ColumnTracker tracker = new ColumnTracker(indexedColumns); + synchronized (this.trackedColumns) { + // we haven't seen this set of columns before, so we need to create a new tracker + if (!this.trackedColumns.contains(tracker)) { + this.trackedColumns.add(tracker); + } + } + + Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts); + + return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker)); + } + + /** + * Initialize the managed local state. Generally, this will only be called by + * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even + * then, there is still fairly low contention as each new Put/Delete will have its own table state. + */ + private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns) + throws IOException { + // check to see if we haven't initialized any columns yet + Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns); + // we have all the columns loaded, so we are good to go. + if (toCover.isEmpty()) { return; } + + // add the current state of the row + this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false); + + // add the covered columns to the set + for (ColumnReference ref : toCover) { + this.columnSet.addColumn(ref); + } + } + + @Override + public Map<String, byte[]> getUpdateAttributes() { + return this.update.getAttributesMap(); + } + + @Override + public byte[] getCurrentRowKey() { + return this.update.getRow(); + } + + public Result getCurrentRowState() { + KeyValueScanner scanner = this.memstore.getScanner(); + List<Cell> kvs = new ArrayList<Cell>(); + while (scanner.peek() != null) { + try { + kvs.add(scanner.next()); + } catch (IOException e) { + // this should never happen - something has gone terribly arwy if it has + throw new RuntimeException("Local MemStore threw IOException!"); + } + } + return Result.create(kvs); + } + + /** + * Helper to add a {@link Mutation} to the values stored for the current row + * + * @param pendingUpdate + * update to apply + */ + public void addUpdateForTesting(Mutation pendingUpdate) { + for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) { + List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue()); + addUpdate(edits); + } + } + + /** + * @param hints + */ + public void setHints(List<? extends IndexedColumnGroup> hints) { + this.hints = hints; + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return this.hints; + } + + @Override + public Collection<KeyValue> getPendingUpdate() { + return this.kvs; + } + + /** + * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually + * apply them. + * + * @param update + * pending {@link KeyValue}s + */ + public void setPendingUpdates(Collection<KeyValue> update) { + this.kvs.clear(); + this.kvs.addAll(update); + } + + /** + * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}. + */ + public void applyPendingUpdates() { + this.addUpdate(kvs); + } + + /** + * Rollback all the given values from the underlying state. + * + * @param values + */ + public void rollback(Collection<KeyValue> values) { + for (KeyValue kv : values) { + this.memstore.rollback(kv); + } + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns) + throws IOException { + Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns); + ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey()); + return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond()); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java new file mode 100644 index 0000000..27af40f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; +import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; +import org.apache.phoenix.hbase.index.covered.data.LocalTable; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; + +/** + * Build covered indexes for phoenix updates. + * <p> + * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't need to do any extra + * synchronization in the IndexBuilder. + * <p> + * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or flush, leading to a + * bloated index that needs to be cleaned up by a background process. + */ +public class NonTxIndexBuilder extends BaseIndexBuilder { + private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class); + + protected LocalHBaseState localTable; + + @Override + public void setup(RegionCoprocessorEnvironment env) throws IOException { + super.setup(env); + this.localTable = new LocalTable(env); + } + + @Override + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException { + // create a state manager, so we can manage each batch + LocalTableState state = new LocalTableState(env, localTable, mutation); + // build the index updates for each group + IndexUpdateManager manager = new IndexUpdateManager(); + + batchMutationAndAddUpdates(manager, state, mutation, indexMetaData); + + if (LOG.isDebugEnabled()) { + LOG.debug("Found index updates for Mutation: " + mutation + "\n" + manager); + } + + return manager.toMap(); + } + + /** + * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each key-value in the + * update to see if it matches the others. Generally, this will be the case, but you can add kvs to a mutation that + * don't all have the timestamp, so we need to manage everything in batches based on timestamp. + * <p> + * Adds all the updates in the {@link Mutation} to the state, as a side-effect. + * @param state + * current state of the row for the mutation. + * @param m + * mutation to batch + * @param indexMetaData TODO + * @param updateMap + * index updates into which to add new updates. Modified as a side-effect. + * + * @throws IOException + */ + private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException { + // split the mutation into timestamp-based batches + Collection<Batch> batches = createTimestampBatchesFromMutation(m); + + // go through each batch of keyvalues and build separate index entries for each + boolean cleanupCurrentState = true; + for (Batch batch : batches) { + /* + * We have to split the work between the cleanup and the update for each group because when we update the + * current state of the row for the current batch (appending the mutations for the current batch) the next + * group will see that as the current state, which will can cause the a delete and a put to be created for + * the next group. + */ + if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) { + cleanupCurrentState = false; + } + } + } + + /** + * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any {@link KeyValue} with a timestamp + * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. + * + * @param m + * {@link Mutation} from which to extract the {@link KeyValue}s + * @return the mutation, broken into batches and sorted in ascending order (smallest first) + */ + protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) { + Map<Long, Batch> batches = new HashMap<Long, Batch>(); + for (List<Cell> family : m.getFamilyCellMap().values()) { + List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family); + createTimestampBatchesFromKeyValues(familyKVs, batches); + } + // sort the batches + List<Batch> sorted = new ArrayList<Batch>(batches.values()); + Collections.sort(sorted, new Comparator<Batch>() { + @Override + public int compare(Batch o1, Batch o2) { + return Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + } + }); + return sorted; + } + + /** + * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any {@link KeyValue} with a + * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called. + * + * @param kvs + * {@link KeyValue}s to break into batches + * @param batches + * to update with the given kvs + */ + protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> batches) { + long now = EnvironmentEdgeManager.currentTime(); + byte[] nowBytes = Bytes.toBytes(now); + + // batch kvs by timestamp + for (KeyValue kv : kvs) { + long ts = kv.getTimestamp(); + // override the timestamp to the current time, so the index and primary tables match + // all the keys with LATEST_TIMESTAMP will then be put into the same batch + if (kv.updateLatestStamp(nowBytes)) { + ts = now; + } + Batch batch = batches.get(ts); + if (batch == null) { + batch = new Batch(ts); + batches.put(ts, batch); + } + batch.add(kv); + } + } + + /** + * For a single batch, get all the index updates and add them to the updateMap + * <p> + * This method manages cleaning up the entire history of the row from the given timestamp forward for out-of-order + * (e.g. 'back in time') updates. + * <p> + * If things arrive out of order (client is using custom timestamps) we should still see the index in the correct + * order (assuming we scan after the out-of-order update in finished). Therefore, we when we aren't the most recent + * update to the index, we need to delete the state at the current timestamp (similar to above), but also issue a + * delete for the added index updates at the next newest timestamp of any of the columns in the update; we need to + * cleanup the insert so it looks like it was also deleted at that next newest timestamp. However, its not enough to + * just update the one in front of us - that column will likely be applied to index entries up the entire history in + * front of us, which also needs to be fixed up. + * <p> + * However, the current update usually will be the most recent thing to be added. In that case, all we need to is + * issue a delete for the previous index row (the state of the row, without the update applied) at the current + * timestamp. This gets rid of anything currently in the index for the current state of the row (at the timestamp). + * Then we can just follow that by applying the pending update and building the index update based on the new row + * state. + * + * @param updateMap + * map to update with new index elements + * @param batch + * timestamp-based batch of edits + * @param state + * local state to update and pass to the codec + * @param requireCurrentStateCleanup + * <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a + * 'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier + * batch already did the cleanup. + * @param indexMetaData TODO + * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt> + * otherwise + * @throws IOException + */ + private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state, + boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException { + + // need a temporary manager for the current batch. It should resolve any conflicts for the + // current batch. Essentially, we can get the case where a batch doesn't change the current + // state of the index (all Puts are covered by deletes), in which case we don't want to add + // anything + // A. Get the correct values for the pending state in the batch + // A.1 start by cleaning up the current state - as long as there are key-values in the batch + // that are indexed, we need to change the current state of the index. Its up to the codec to + // determine if we need to make any cleanup given the pending update. + long batchTs = batch.getTimestamp(); + state.setPendingUpdates(batch.getKvs()); + addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData); + + // A.2 do a single pass first for the updates to the current state + state.applyPendingUpdates(); + long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData); + // if all the updates are the latest thing in the index, we are done - don't go and fix history + if (ColumnTracker.isNewestTime(minTs)) { return false; } + + // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the + // index. after this, we have the correct view of the index, from the batch up to the index + while (!ColumnTracker.isNewestTime(minTs)) { + minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData); + } + + // B. only cleanup the current state if we need to - its a huge waste of effort otherwise. + if (requireCurrentStateCleanup) { + // roll back the pending update. This is needed so we can remove all the 'old' index entries. + // We don't need to do the puts here, but just the deletes at the given timestamps since we + // just want to completely hide the incorrect entries. + state.rollback(batch.getKvs()); + // setup state + state.setPendingUpdates(batch.getKvs()); + + // cleanup the pending batch. If anything in the correct history is covered by Deletes used to + // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both + // because the update may have a different set of columns or value based on the update). + cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData); + + // have to roll the state forward again, so the current state is correct + state.applyPendingUpdates(); + return true; + } + return false; + } + + private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData) + throws IOException { + state.setCurrentTimestamp(ts); + ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData); + return ts; + } + + private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData) + throws IOException { + // get the cleanup for the current state + state.setCurrentTimestamp(batchTs); + addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData); + // ignore any index tracking from the delete + state.resetTrackedColumns(); + } + + /** + * Add the necessary mutations for the pending batch on the local state. Handles rolling up through history to + * determine the index changes after applying the batch (for the case where the batch is back in time). + * + * @param updateMap + * to update with index mutations + * @param state + * current state of the table + * @param indexMetaData TODO + * @param batch + * to apply to the current state + * @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)} + * returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>. + * @throws IOException + */ + private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state, IndexMetaData indexMetaData) + throws IOException { + + // get the index updates for this current batch + Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData); + state.resetTrackedColumns(); + + /* + * go through all the pending updates. If we are sure that all the entries are the latest timestamp, we can just + * add the index updates and move on. However, if there are columns that we skip past (based on the timestamp of + * the batch), we need to roll back up the history. Regardless of whether or not they are the latest timestamp, + * the entries here are going to be correct for the current batch timestamp, so we add them to the updates. The + * only thing we really care about it if we need to roll up the history and fix it as we go. + */ + // timestamp of the next update we need to track + long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; + List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>(); + for (IndexUpdate update : upserts) { + // this is the one bit where we check the timestamps + final ColumnTracker tracker = update.getIndexedColumns(); + long trackerTs = tracker.getTS(); + // update the next min TS we need to track + if (trackerTs < minTs) { + minTs = tracker.getTS(); + } + // track index hints for the next round. Hint if we need an update for that column for the + // next timestamp. These columns clearly won't need to update as we go through time as they + // already match the most recent possible thing. + boolean needsCleanup = false; + if (tracker.hasNewerTimestamps()) { + columnHints.add(tracker); + // this update also needs to be cleaned up at the next timestamp because it not the latest. + needsCleanup = true; + } + + // only make the put if the index update has been setup + if (update.isValid()) { + byte[] table = update.getTableName(); + Mutation mutation = update.getUpdate(); + updateMap.addIndexUpdate(table, mutation); + + // only make the cleanup if we made a put and need cleanup + if (needsCleanup) { + // there is a TS for the interested columns that is greater than the columns in the + // put. Therefore, we need to issue a delete at the same timestamp + Delete d = new Delete(mutation.getRow()); + d.setTimestamp(tracker.getTS()); + updateMap.addIndexUpdate(table, d); + } + } + } + return minTs; + } + + /** + * Cleanup the index based on the current state from the given batch. Iterates over each timestamp (for the indexed + * rows) for the current state of the table and cleans up all the existing entries generated by the codec. + * <p> + * Adds all pending updates to the updateMap + * + * @param updateMap + * updated with the pending index updates from the codec + * @param batchTs + * timestamp from which we should cleanup + * @param state + * current state of the primary table. Should already by setup to the correct state from which we want to + * cleanup. + * @param indexMetaData TODO + * @throws IOException + */ + private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData) + throws IOException { + // get the cleanup for the current state + state.setCurrentTimestamp(batchTs); + addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData); + Set<ColumnTracker> trackers = state.getTrackedColumns(); + long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; + for (ColumnTracker tracker : trackers) { + if (tracker.getTS() < minTs) { + minTs = tracker.getTS(); + } + } + state.resetTrackedColumns(); + if (!ColumnTracker.isNewestTime(minTs)) { + state.setHints(Lists.newArrayList(trackers)); + cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData); + } + } + + /** + * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the + * update map. + * <p> + * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc). + * @param indexMetaData TODO + * + * @throws IOException + */ + protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData) + throws IOException { + Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData); + if (cleanup != null) { + for (IndexUpdate d : cleanup) { + if (!d.isValid()) { + continue; + } + // override the timestamps in the delete to match the current batch. + Delete remove = (Delete)d.getUpdate(); + remove.setTimestamp(ts); + updateMap.addIndexUpdate(d.getTableName(), remove); + } + } + } + + @Override + public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData indexMetaData) + throws IOException { + // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java index 4c4d0b0..0e961db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java @@ -23,14 +23,14 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; - +import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; -import org.apache.phoenix.hbase.index.scanner.Scanner; /** * Interface for the current state of the table. This is generally going to be as of a timestamp - a @@ -52,46 +52,14 @@ public interface TableState { public long getCurrentTimestamp(); /** - * Set the current timestamp up to which the table should allow access to the underlying table. - * This overrides the timestamp view provided by the indexer - use with care! - * @param timestamp timestamp up to which the table should allow access. - */ - public void setCurrentTimestamp(long timestamp); - - /** * @return the attributes attached to the current update (e.g. {@link Mutation}). */ public Map<String, byte[]> getUpdateAttributes(); /** - * Get a scanner on the columns that are needed by the index. - * <p> - * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given - * columns with a timestamp earlier than the timestamp to which the table is currently set (the - * current state of the table for which we need to build an update). - * <p> - * If none of the passed columns matches any of the columns in the pending update (as determined - * by {@link ColumnReference#matchesFamily(byte[])} and - * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This - * is because it doesn't make sense to build index updates when there is no change in the table - * state for any of the columns you are indexing. - * <p> - * <i>NOTE:</i> This method should <b>not</b> be used during - * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been - * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> - * need to track the indexed columns. - * <p> - * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you - * request - you will never see a column with the timestamp we are tracking, but the next oldest - * timestamp for that column. - * @param indexedColumns the columns to that will be indexed - * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to - * the builder. Even if no update is necessary for the requested columns, you still need - * to return the {@link IndexUpdate}, just don't set the update for the - * {@link IndexUpdate}. - * @throws IOException + * Get a getter interface for the state of the index row */ - Pair<Scanner, IndexUpdate> getIndexedColumnsTableState( + Pair<ValueGetter, IndexUpdate> getIndexUpdateState( Collection<? extends ColumnReference> indexedColumns) throws IOException; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java index 7fbc7f3..52076a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java @@ -24,7 +24,8 @@ import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; - +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; @@ -37,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; public class LazyValueGetter implements ValueGetter { private Scanner scan; - private volatile Map<ColumnReference, ImmutableBytesPtr> values; + private volatile Map<ColumnReference, ImmutableBytesWritable> values; private byte[] row; /** @@ -51,16 +52,16 @@ public class LazyValueGetter implements ValueGetter { } @Override - public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException { + public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException { // ensure we have a backing map if (values == null) { synchronized (this) { - values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesPtr>()); + values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesWritable>()); } } // check the value in the map - ImmutableBytesPtr value = values.get(ref); + ImmutableBytesWritable value = values.get(ref); if (value == null) { value = get(ref); values.put(ref, value); @@ -80,7 +81,7 @@ public class LazyValueGetter implements ValueGetter { } // there is a next value - we only care about the current value, so we can just snag that Cell next = scan.next(); - if (ref.matches(next)) { + if (ref.matches(KeyValueUtil.ensureKeyValue(next))) { return new ImmutableBytesPtr(next.getValueArray(), next.getValueOffset(), next.getValueLength()); } return null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 4b953e6..4efca9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -1,19 +1,11 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.hbase.index.covered.example; @@ -32,337 +24,341 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; - -import com.google.common.collect.Lists; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexUpdate; +import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.scanner.Scanner; -import org.apache.phoenix.index.BaseIndexCodec; + +import com.google.common.collect.Lists; /** * */ public class CoveredColumnIndexCodec extends BaseIndexCodec { - private static final byte[] EMPTY_BYTES = new byte[0]; - public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS"); + private static final byte[] EMPTY_BYTES = new byte[0]; + public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS"); - private List<ColumnGroup> groups; + private List<ColumnGroup> groups; - /** - * @param groups to initialize the codec with - * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing - * purposes - */ - public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) { - CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec(); - codec.groups = Lists.newArrayList(groups); - return codec; - } - - @Override - public void initialize(RegionCoprocessorEnvironment env) { - groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration()); - } - - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) { - List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); - for (ColumnGroup group : groups) { - IndexUpdate update = getIndexUpdateForGroup(group, state); - updates.add(update); + /** + * @param groups + * to initialize the codec with + * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing purposes + */ + public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) { + CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec(); + codec.groups = Lists.newArrayList(groups); + return codec; } - return updates; - } - /** - * @param group - * @param state - * @return the update that should be made to the table - */ - private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) { - List<CoveredColumn> refs = group.getColumns(); - try { - Pair<Scanner, IndexUpdate> stateInfo = state.getIndexedColumnsTableState(refs); - Scanner kvs = stateInfo.getFirst(); - Pair<Integer, List<ColumnEntry>> columns = - getNextEntries(refs, kvs, state.getCurrentRowKey()); - // make sure we close the scanner - kvs.close(); - if (columns.getFirst().intValue() == 0) { - return stateInfo.getSecond(); - } - // have all the column entries, so just turn it into a Delete for the row - // convert the entries to the needed values - byte[] rowKey = - composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); - Put p = new Put(rowKey, state.getCurrentTimestamp()); - // add the columns to the put - addColumnsToPut(p, columns.getSecond()); - - // update the index info - IndexUpdate update = stateInfo.getSecond(); - update.setTable(Bytes.toBytes(group.getTable())); - update.setUpdate(p); - return update; - } catch (IOException e) { - throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + @Override + public void initialize(RegionCoprocessorEnvironment env) { + groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration()); } - } - private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) { - // add each of the corresponding families to the put - int count = 0; - for (ColumnEntry column : columns) { - indexInsert.add(INDEX_ROW_COLUMN_FAMILY, - ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null); - } + @Override + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { + List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); + for (ColumnGroup group : groups) { + IndexUpdate update = getIndexUpdateForGroup(group, state); + updates.add(update); + } + return updates; } - private static byte[] toIndexQualifier(CoveredColumn column) { - return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), - column.getQualifier()); - } + /** + * @param group + * @param state + * @return the update that should be made to the table + */ + private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) { + List<CoveredColumn> refs = group.getColumns(); + try { + Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs); + Scanner kvs = stateInfo.getFirst(); + Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey()); + // make sure we close the scanner + kvs.close(); + if (columns.getFirst().intValue() == 0) { return stateInfo.getSecond(); } + // have all the column entries, so just turn it into a Delete for the row + // convert the entries to the needed values + byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); + Put p = new Put(rowKey, state.getCurrentTimestamp()); + // add the columns to the put + addColumnsToPut(p, columns.getSecond()); + + // update the index info + IndexUpdate update = stateInfo.getSecond(); + update.setTable(Bytes.toBytes(group.getTable())); + update.setUpdate(p); + return update; + } catch (IOException e) { + throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + } + } - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) { - List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); - for (ColumnGroup group : groups) { - deletes.add(getDeleteForGroup(group, state)); + private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) { + // add each of the corresponding families to the put + int count = 0; + for (ColumnEntry column : columns) { + indexInsert.add(INDEX_ROW_COLUMN_FAMILY, + ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null); + } } - return deletes; - } + private static byte[] toIndexQualifier(CoveredColumn column) { + return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), column.getQualifier()); + } - /** - * Get all the deletes necessary for a group of columns - logically, the cleanup the index table - * for a given index. - * @param group index information - * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary - */ - private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) { - List<CoveredColumn> refs = group.getColumns(); - try { - Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs); - Pair<Integer, List<ColumnEntry>> columns = - getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); - // make sure we close the scanner reference - kvs.getFirst().close(); - // no change, just return the passed update - if (columns.getFirst() == 0) { - return kvs.getSecond(); - } - // have all the column entries, so just turn it into a Delete for the row - // convert the entries to the needed values - byte[] rowKey = - composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); - Delete d = new Delete(rowKey); - d.setTimestamp(state.getCurrentTimestamp()); - IndexUpdate update = kvs.getSecond(); - update.setUpdate(d); - update.setTable(Bytes.toBytes(group.getTable())); - return update; - } catch (IOException e) { - throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { + List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); + for (ColumnGroup group : groups) { + deletes.add(getDeleteForGroup(group, state)); + } + return deletes; } - } - /** - * Get the next batch of primary table values for the given columns - * @param refs columns to match against - * @param kvs - * @param currentRow - * @return the total length of all values found and the entries to add for the index - */ - private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, - byte[] currentRow) throws IOException { - int totalValueLength = 0; - List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size()); - - // pull out the latest state for each column reference, in order - for (CoveredColumn ref : refs) { - KeyValue first = ref.getFirstKeyValueForRow(currentRow); - if (!kvs.seek(first)) { - // no more keys, so add a null value - entries.add(new ColumnEntry(null, ref)); - continue; - } - // there is a next value - we only care about the current value, so we can just snag that - Cell next = kvs.next(); - if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { - byte[] v = next.getValue(); - totalValueLength += v.length; - entries.add(new ColumnEntry(v, ref)); - } else { - // this first one didn't match at all, so we have to put in a null entry - entries.add(new ColumnEntry(null, ref)); - continue; - } - // here's where is gets a little tricky - we either need to decide if we should continue - // adding entries (matches all qualifiers) or if we are done (matches a single qualifier) - if (!ref.allColumns()) { - continue; - } - // matches all columns, so we need to iterate until we hit the next column with the same - // family as the current key - byte[] lastQual = next.getQualifier(); - byte[] nextQual = null; - while ((next = kvs.next()) != null) { - // different family, done with this column - if (!ref.matchesFamily(next.getFamily())) { - break; + /** + * Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index. + * + * @param group + * index information + * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary + */ + private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) { + List<CoveredColumn> refs = group.getColumns(); + try { + Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs); + Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey()); + // make sure we close the scanner reference + kvs.getFirst().close(); + // no change, just return the passed update + if (columns.getFirst() == 0) { return kvs.getSecond(); } + // have all the column entries, so just turn it into a Delete for the row + // convert the entries to the needed values + byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond()); + Delete d = new Delete(rowKey); + d.setTimestamp(state.getCurrentTimestamp()); + IndexUpdate update = kvs.getSecond(); + update.setUpdate(d); + update.setTable(Bytes.toBytes(group.getTable())); + return update; + } catch (IOException e) { + throw new RuntimeException("Unexpected exception when getting state for columns: " + refs); } - nextQual = next.getQualifier(); - // we are still on the same qualifier - skip it, since we already added a column for it - if (Arrays.equals(lastQual, nextQual)) { - continue; + } + + /** + * Get the next batch of primary table values for the given columns + * + * @param refs + * columns to match against + * @param state + * @return the total length of all values found and the entries to add for the index + */ + private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow) + throws IOException { + int totalValueLength = 0; + List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size()); + + // pull out the latest state for each column reference, in order + for (CoveredColumn ref : refs) { + KeyValue first = ref.getFirstKeyValueForRow(currentRow); + if (!kvs.seek(first)) { + // no more keys, so add a null value + entries.add(new ColumnEntry(null, ref)); + continue; + } + // there is a next value - we only care about the current value, so we can just snag that + Cell next = kvs.next(); + if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) { + byte[] v = next.getValue(); + totalValueLength += v.length; + entries.add(new ColumnEntry(v, ref)); + } else { + // this first one didn't match at all, so we have to put in a null entry + entries.add(new ColumnEntry(null, ref)); + continue; + } + // here's where is gets a little tricky - we either need to decide if we should continue + // adding entries (matches all qualifiers) or if we are done (matches a single qualifier) + if (!ref.allColumns()) { + continue; + } + // matches all columns, so we need to iterate until we hit the next column with the same + // family as the current key + byte[] lastQual = next.getQualifier(); + byte[] nextQual = null; + while ((next = kvs.next()) != null) { + // different family, done with this column + if (!ref.matchesFamily(next.getFamily())) { + break; + } + nextQual = next.getQualifier(); + // we are still on the same qualifier - skip it, since we already added a column for it + if (Arrays.equals(lastQual, nextQual)) { + continue; + } + // this must match the qualifier since its an all-qualifiers specifier, so we add it + byte[] v = next.getValue(); + totalValueLength += v.length; + entries.add(new ColumnEntry(v, ref)); + // update the last qualifier to check against + lastQual = nextQual; + } } - // this must match the qualifier since its an all-qualifiers specifier, so we add it - byte[] v = next.getValue(); - totalValueLength += v.length; - entries.add(new ColumnEntry(v, ref)); - // update the last qualifier to check against - lastQual = nextQual; - } + return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries); } - return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries); - } - static class ColumnEntry { - byte[] value = EMPTY_BYTES; - CoveredColumn ref; + static class ColumnEntry { + byte[] value = EMPTY_BYTES; + CoveredColumn ref; - public ColumnEntry(byte[] value, CoveredColumn ref) { - this.value = value == null ? EMPTY_BYTES : value; - this.ref = ref; + public ColumnEntry(byte[] value, CoveredColumn ref) { + this.value = value == null ? EMPTY_BYTES : value; + this.ref = ref; + } } - } - /** - * Compose the final index row key. - * <p> - * This is faster than adding each value independently as we can just build a single a array and - * copy everything over once. - * @param pk primary key of the original row - * @param length total number of bytes of all the values that should be added - * @param values to use when building the key - */ - static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { - // now build up expected row key, each of the values, in order, followed by the PK and then some - // info about lengths so we can deserialize each value - byte[] output = new byte[length + pk.length]; - int pos = 0; - int[] lengths = new int[values.size()]; - int i = 0; - for (ColumnEntry entry : values) { - byte[] v = entry.value; - // skip doing the copy attempt, if we don't need to - if (v.length != 0) { - System.arraycopy(v, 0, output, pos, v.length); - pos += v.length; - } - lengths[i++] = v.length; - } + /** + * Compose the final index row key. + * <p> + * This is faster than adding each value independently as we can just build a single a array and copy everything + * over once. + * + * @param pk + * primary key of the original row + * @param length + * total number of bytes of all the values that should be added + * @param values + * to use when building the key + */ + static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) { + // now build up expected row key, each of the values, in order, followed by the PK and then some + // info about lengths so we can deserialize each value + byte[] output = new byte[length + pk.length]; + int pos = 0; + int[] lengths = new int[values.size()]; + int i = 0; + for (ColumnEntry entry : values) { + byte[] v = entry.value; + // skip doing the copy attempt, if we don't need to + if (v.length != 0) { + System.arraycopy(v, 0, output, pos, v.length); + pos += v.length; + } + lengths[i++] = v.length; + } + + // add the primary key to the end of the row key + System.arraycopy(pk, 0, output, pos, pk.length); - // add the primary key to the end of the row key - System.arraycopy(pk, 0, output, pos, pk.length); + // add the lengths as suffixes so we can deserialize the elements again + for (int l : lengths) { + output = ArrayUtils.addAll(output, Bytes.toBytes(l)); + } - // add the lengths as suffixes so we can deserialize the elements again - for (int l : lengths) { - output = ArrayUtils.addAll(output, Bytes.toBytes(l)); + // and the last integer is the number of values + return ArrayUtils.addAll(output, Bytes.toBytes(values.size())); } - // and the last integer is the number of values - return ArrayUtils.addAll(output, Bytes.toBytes(values.size())); - } + /** + * Essentially a short-cut from building a {@link Put}. + * + * @param pk + * row key + * @param timestamp + * timestamp of all the keyvalues + * @param values + * expected value--column pair + * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs. + */ + public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, + List<Pair<byte[], CoveredColumn>> values) { + + int length = 0; + List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size()); + for (Pair<byte[], CoveredColumn> value : values) { + ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond()); + length += value.getFirst().length; + expected.add(entry); + } - /** - * Essentially a short-cut from building a {@link Put}. - * @param pk row key - * @param timestamp timestamp of all the keyvalues - * @param values expected value--column pair - * @return a keyvalues that the index contains for a given row at a timestamp with the given value - * -- column pairs. - */ - public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp, - List<Pair<byte[], CoveredColumn>> values) { - - int length = 0; - List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size()); - for (Pair<byte[], CoveredColumn> value : values) { - ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond()); - length += value.getFirst().length; - expected.add(entry); - } - - byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); - Put p = new Put(rowKey, timestamp); - CoveredColumnIndexCodec.addColumnsToPut(p, expected); - List<KeyValue> kvs = new ArrayList<KeyValue>(); - for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { - kvs.addAll(entry.getValue()); - } - - return kvs; - } + byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected); + Put p = new Put(rowKey, timestamp); + CoveredColumnIndexCodec.addColumnsToPut(p, expected); + List<KeyValue> kvs = new ArrayList<KeyValue>(); + for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) { + kvs.addAll(entry.getValue()); + } - public static List<byte[]> getValues(byte[] bytes) { - // get the total number of keys in the bytes - int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); - List<byte[]> keys = new ArrayList<byte[]>(keyCount); - int[] lengths = new int[keyCount]; - int lengthPos = keyCount - 1; - int pos = bytes.length - Bytes.SIZEOF_INT; - // figure out the length of each key - for (int i = 0; i < keyCount; i++) { - lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); - pos -= Bytes.SIZEOF_INT; + return kvs; } - int current = 0; - for (int length : lengths) { - byte[] key = Arrays.copyOfRange(bytes, current, current + length); - keys.add(key); - current += length; - } + public static List<byte[]> getValues(byte[] bytes) { + // get the total number of keys in the bytes + int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); + List<byte[]> keys = new ArrayList<byte[]>(keyCount); + int[] lengths = new int[keyCount]; + int lengthPos = keyCount - 1; + int pos = bytes.length - Bytes.SIZEOF_INT; + // figure out the length of each key + for (int i = 0; i < keyCount; i++) { + lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); + pos -= Bytes.SIZEOF_INT; + } - return keys; - } + int current = 0; + for (int length : lengths) { + byte[] key = Arrays.copyOfRange(bytes, current, current + length); + keys.add(key); + current += length; + } - /** - * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes - * @param bytes array to read from - * @param start start point, backwards from which to read. For example, if specifying "25", we - * would try to read an integer from 21 -> 25 - * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists. - */ - private static int getPreviousInteger(byte[] bytes, int start) { - return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT); - } + return keys; + } - /** - * Check to see if an row key just contains a list of null values. - * @param bytes row key to examine - * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise - */ - public static boolean checkRowKeyForAllNulls(byte[] bytes) { - int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); - int pos = bytes.length - Bytes.SIZEOF_INT; - for (int i = 0; i < keyCount; i++) { - int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); - if (next > 0) { - return false; - } - pos -= Bytes.SIZEOF_INT; + /** + * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes + * + * @param bytes + * array to read from + * @param start + * start point, backwards from which to read. For example, if specifying "25", we would try to read an + * integer from 21 -> 25 + * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists. + */ + private static int getPreviousInteger(byte[] bytes, int start) { + return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT); } - return true; - } + /** + * Check to see if an row key just contains a list of null values. + * + * @param bytes + * row key to examine + * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise + */ + public static boolean checkRowKeyForAllNulls(byte[] bytes) { + int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length); + int pos = bytes.length - Bytes.SIZEOF_INT; + for (int i = 0; i < keyCount; i++) { + int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos); + if (next > 0) { return false; } + pos -= Bytes.SIZEOF_INT; + } - @Override - public boolean isEnabled(Mutation m) { - // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this - // simple check for the moment. - return groups.size() > 0; - } + return true; + } + + @Override + public boolean isEnabled(Mutation m) { + // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this + // simple check for the moment. + return groups.size() > 0; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java index 6ac89d1..48c714d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java @@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexCodec; /** @@ -136,7 +136,7 @@ public class CoveredColumnIndexSpecifierBuilder { void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws IOException { // add the codec for the index to the map of options Map<String, String> opts = this.convertToMap(); - opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName()); + opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName()); Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java index f80cf41..60698c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -35,8 +36,9 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.covered.Batch; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.LocalTableState; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; /** @@ -90,7 +92,7 @@ import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of the entire row * <i>every time there is a write to the table</i>. */ -public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder { +public class CoveredColumnIndexer extends NonTxIndexBuilder { /** * Create the specified index table with the necessary columns @@ -117,17 +119,16 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder { @Override public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( - Collection<KeyValue> filtered) throws IOException { - + Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException { // stores all the return values IndexUpdateManager updateMap = new IndexUpdateManager(); // batch the updates by row to make life easier and ordered Collection<Batch> batches = batchByRow(filtered); for (Batch batch : batches) { - KeyValue curKV = batch.getKvs().iterator().next(); + Cell curKV = batch.getKvs().iterator().next(); Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength()); - for (KeyValue kv : batch.getKvs()) { + for (Cell kv : batch.getKvs()) { // we only need to cleanup Put entries byte type = kv.getTypeByte(); Type t = KeyValue.Type.codeToType(type); @@ -145,7 +146,7 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder { for (Batch entry : timeBatch) { //just set the timestamp on the table - it already has all the future state state.setCurrentTimestamp(entry.getTimestamp()); - this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp()); + this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), indexMetaData); } } return updateMap.toMap(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java index b9f3858..7c69493 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java @@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.update; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; @@ -45,7 +44,7 @@ public class ColumnTracker implements IndexedColumnGroup { public ColumnTracker(Collection<? extends ColumnReference> columns) { this.columns = new ArrayList<ColumnReference>(columns); // sort the columns - Collections.sort(this.columns); + // no need to do this: Collections.sort(this.columns); this.hashCode = calcHashCode(this.columns); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java index d09498a..26f620f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java @@ -30,12 +30,11 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; - /** * Keeps track of the index updates */ @@ -182,8 +181,6 @@ public class IndexUpdateManager { for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : map.entrySet()) { // get is ok because we always set with just the bytes byte[] tableName = updates.getKey().get(); - // TODO replace this as just storing a byte[], to avoid all the String <-> byte[] swapping - // HBase does for (Mutation m : updates.getValue()) { // skip elements that have been marked for delete if (shouldBeRemoved(m)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java index a28268c..884cca6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java @@ -20,7 +20,7 @@ package org.apache.phoenix.hbase.index.scanner; import java.io.IOException; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Cell; /** @@ -29,17 +29,17 @@ import org.apache.hadoop.hbase.KeyValue; public class EmptyScanner implements Scanner { @Override - public KeyValue next() throws IOException { + public Cell next() throws IOException { return null; } @Override - public boolean seek(KeyValue next) throws IOException { + public boolean seek(Cell next) throws IOException { return false; } @Override - public KeyValue peek() throws IOException { + public Cell peek() throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java index 43ddc45..9454de5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java @@ -44,7 +44,7 @@ public interface Scanner extends Closeable { * @return <tt>true</tt> if there are values left in <tt>this</tt>, <tt>false</tt> otherwise * @throws IOException if there is an error reading the underlying data. */ - public boolean seek(KeyValue next) throws IOException; + public boolean seek(Cell next) throws IOException; /** * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' it off the top of the http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java index ff33ec2..e120268 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java @@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.collect.Lists; import org.apache.phoenix.hbase.index.covered.KeyValueStore; import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter; import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter; @@ -43,6 +41,8 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import com.google.common.collect.Lists; + /** * */ @@ -59,6 +59,7 @@ public class ScannerBuilder { public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) { + // TODO: This needs to use some form of the filter that Tephra has when transactional Filter columnFilters = getColumnFilters(indexedColumns); FilterList filters = new FilterList(Lists.newArrayList(columnFilters)); @@ -136,7 +137,7 @@ public class ScannerBuilder { } @Override - public boolean seek(KeyValue next) throws IOException { + public boolean seek(Cell next) throws IOException { // check to see if the next kv is after the current key, in which case we can use reseek, // which will be more efficient Cell peek = kvScanner.peek(); @@ -144,13 +145,13 @@ public class ScannerBuilder { if (peek != null) { int compare = KeyValue.COMPARATOR.compare(peek, next); if (compare < 0) { - return kvScanner.reseek(next); + return kvScanner.reseek(KeyValueUtil.ensureKeyValue(next)); } else if (compare == 0) { // we are already at the given key! return true; } } - return kvScanner.seek(next); + return kvScanner.seek(KeyValueUtil.ensureKeyValue(next)); } @Override
