Repository: phoenix Updated Branches: refs/heads/txn 974329cd1 -> 433495482
Add isValid check for Put index updates for consistency Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/43349548 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/43349548 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/43349548 Branch: refs/heads/txn Commit: 4334954829246d20b5a67ce6bf6b0abb1a7164af Parents: 974329c Author: James Taylor <[email protected]> Authored: Sat Apr 18 18:31:13 2015 -0700 Committer: James Taylor <[email protected]> Committed: Sat Apr 18 18:31:13 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/index/PhoenixIndexCodec.java | 84 -------------------- .../index/PhoenixTransactionalIndexer.java | 16 ++-- 2 files changed, 10 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/43349548/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 956e5ea..109de84 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -19,7 +19,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; @@ -29,7 +28,6 @@ import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.util.MetaDataUtil; import com.google.common.collect.Lists; @@ -65,32 +63,16 @@ public class PhoenixIndexCodec extends BaseIndexCodec { if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { return Collections.emptyList(); } - // TODO: confirm that this special case isn't needed - // (as state should match this with the above call, since there are no mutable columns) - /* - if (maintainer.isImmutableRows()) { - indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns())); - indexUpdate.setTable(maintainer.getIndexTableName()); - valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate()); - } - */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); ptr.set(state.getCurrentRowKey()); List<IndexUpdate> indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes - assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0); - } Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); ValueGetter valueGetter = statePair.getFirst(); IndexUpdate indexUpdate = statePair.getSecond(); indexUpdate.setTable(maintainer.getIndexTableName()); Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env .getRegion().getStartKey(), env.getRegion().getEndKey()); - if (put == null) { - throw new IllegalStateException("Null put for " + env.getRegion().getRegionInfo().getTable().getNameAsString() - + ": " + Bytes.toStringBinary(ptr.get(), ptr.getOffset(), ptr.getLength())); - } indexUpdate.setUpdate(put); indexUpdates.add(indexUpdate); } @@ -104,9 +86,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec { ptr.set(state.getCurrentRowKey()); List<IndexUpdate> indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes - assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0); - } if (maintainer.isImmutableRows()) { continue; } @@ -122,69 +101,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec { return indexUpdates; } - /* - @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, BatchContext context) throws IOException { - return getIndexUpdates(state, context, true); - } - - @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, BatchContext context) throws IOException { - return getIndexUpdates(state, context, false); - } - - private Iterable<IndexUpdate> getIndexUpdates(TableState state, BatchContext context, boolean upsert) throws IOException { - List<IndexMaintainer> indexMaintainers = ((PhoenixBatchContext)context).getIndexMetaData().getIndexMaintainers(); - if (indexMaintainers.isEmpty()) { return Collections.emptyList(); } - List<IndexUpdate> indexUpdates = Lists.newArrayList(); - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy - byte[] dataRowKey = state.getCurrentRowKey(); - ptr.set(dataRowKey); - byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName()); - ValueGetter valueGetter = null; - Scanner scanner = null; - for (IndexMaintainer maintainer : indexMaintainers) { - if (upsert) { - // Short-circuit building state when we know it's a row deletion - if (maintainer.isRowDeleted(state.getPendingUpdate())) { - continue; - } - } - IndexUpdate indexUpdate = null; - if (maintainer.isImmutableRows()) { - indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns())); - if (maintainer.isLocalIndex()) { - indexUpdate.setTable(localIndexTableName); - } else { - indexUpdate.setTable(maintainer.getIndexTableName()); - } - valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate()); - } else { - Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); - valueGetter = statePair.getFirst(); - indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); - } - Mutation mutation = null; - if (upsert) { - mutation = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env - .getRegion().getStartKey(), env.getRegion().getEndKey()); - } else { - mutation = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), - state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey()); - } - indexUpdate.setUpdate(mutation); - if (scanner != null) { - scanner.close(); - scanner = null; - } - indexUpdates.add(indexUpdate); - } - return indexUpdates; - } - */ - @Override public boolean isEnabled(Mutation m) throws IOException { return hasIndexMaintainers(m.getAttributesMap()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/43349548/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index d77f7e6..6a13552 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -195,18 +195,22 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); } state.applyMutation(); - Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData); - for (IndexUpdate update : updates) { - indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); + } } } } for (Mutation m : mutations.values()) { TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m); state.applyMutation(); - Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData); - for (IndexUpdate update : updates) { - indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); + } } } } finally {
