PHOENIX-1830 Transactional mutable secondary indexes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e1521ab0 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e1521ab0 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e1521ab0 Branch: refs/heads/txn Commit: e1521ab059b51a81738e5b9fd8390034dd1b9f1c Parents: 0ad5c56 Author: James Taylor <[email protected]> Authored: Thu Apr 9 00:08:11 2015 -0700 Committer: James Taylor <[email protected]> Committed: Thu Apr 9 00:08:11 2015 -0700 ---------------------------------------------------------------------- .../end2end/index/TxGlobalMutableIndexIT.java | 41 +++ .../EndToEndCoveredColumnsIndexBuilderIT.java | 8 +- .../covered/example/FailWithoutRetriesIT.java | 12 +- .../org/apache/phoenix/hbase/index/Indexer.java | 54 --- .../phoenix/hbase/index/MultiMutation.java | 86 +++++ .../hbase/index/builder/BaseIndexBuilder.java | 23 +- .../hbase/index/builder/BaseIndexCodec.java | 47 +++ .../hbase/index/builder/IndexBuildManager.java | 18 +- .../hbase/index/builder/IndexBuilder.java | 19 +- .../phoenix/hbase/index/covered/IndexCodec.java | 23 +- .../hbase/index/covered/IndexMetaData.java | 22 ++ .../hbase/index/covered/LocalTableState.java | 10 +- .../hbase/index/covered/NonTxIndexBuilder.java | 65 ++-- .../phoenix/hbase/index/covered/TableState.java | 2 - .../hbase/index/covered/TxIndexBuilder.java | 247 -------------- .../example/CoveredColumnIndexCodec.java | 10 +- .../covered/example/CoveredColumnIndexer.java | 6 +- .../apache/phoenix/index/BaseIndexCodec.java | 59 ---- .../phoenix/index/PhoenixIndexBuilder.java | 83 ++++- .../apache/phoenix/index/PhoenixIndexCodec.java | 160 ++++----- .../phoenix/index/PhoenixIndexMetaData.java | 97 ++++++ .../index/PhoenixTransactionalIndexer.java | 339 +++++++++++++++++++ .../phoenix/index/PhoenixTxIndexBuilder.java | 53 --- .../index/PhoenixTxIndexFailurePolicy.java | 50 --- .../query/ConnectionQueryServicesImpl.java | 19 +- .../apache/phoenix/query/QueryConstants.java | 10 +- .../phoenix/trace/PhoenixMetricsSink.java | 41 ++- .../java/org/apache/phoenix/util/IndexUtil.java | 62 ---- .../covered/CoveredIndexCodecForTesting.java | 9 +- .../example/TestCoveredColumnIndexCodec.java | 9 +- 30 files changed, 909 insertions(+), 775 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java new file mode 100644 index 0000000..2d22b0f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import java.util.Map; + +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; + +import com.google.common.collect.Maps; + +public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT { + @BeforeClass + @Shadower(classBeingShadowed = BaseMutableIndexIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan + // Forces server cache to be used + props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2)); + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index fa85f00..1cdd508 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@ -172,15 +172,15 @@ public class EndToEndCoveredColumnsIndexBuilderIT { private Queue<TableStateVerifier> verifiers = new ArrayDeque<TableStateVerifier>(); @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { verify(state); - return super.getIndexDeletes(state); + return super.getIndexDeletes(state, context); } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { verify(state); - return super.getIndexUpserts(state); + return super.getIndexUpserts(state, context); } private void verify(TableState state) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java index 281ad63..6367945 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.Bytes; @@ -32,10 +31,11 @@ import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.hbase.index.IndexTestingUtils; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.TableName; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import org.apache.phoenix.index.BaseIndexCodec; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -63,18 +63,14 @@ public class FailWithoutRetriesIT { public static class FailingTestCodec extends BaseIndexCodec { @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException { throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName()); } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException { throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName()); } - - @Override - public void setContext(TableState state, Mutation mutation) throws IOException {} - } @BeforeClass http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index a4fc96b..d79ffb5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -25,12 +25,10 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -282,58 +280,6 @@ public class Indexer extends BaseRegionObserver { } } - private class MultiMutation extends Mutation { - - private ImmutableBytesPtr rowKey; - - public MultiMutation(ImmutableBytesPtr rowkey) { - this.rowKey = rowkey; - } - - /** - * @param stored - */ - public void addAll(Mutation stored) { - // add all the kvs - for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) { - byte[] family = kvs.getKey(); - List<Cell> list = getKeyValueList(family, kvs.getValue().size()); - list.addAll(kvs.getValue()); - familyMap.put(family, list); - } - - // add all the attributes, not overriding already stored ones - for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) { - if (this.getAttribute(attrib.getKey()) == null) { - this.setAttribute(attrib.getKey(), attrib.getValue()); - } - } - } - - private List<Cell> getKeyValueList(byte[] family, int hint) { - List<Cell> list = familyMap.get(family); - if (list == null) { - list = new ArrayList<Cell>(hint); - } - return list; - } - - @Override - public byte[] getRow(){ - return this.rowKey.copyBytesIfNecessary(); - } - - @Override - public int hashCode() { - return this.rowKey.hashCode(); - } - - @Override - public boolean equals(Object o) { - return o == null ? false : o.hashCode() == this.hashCode(); - } - } - /** * Add the index updates to the WAL, or write to the index table, if the WAL has been disabled * @return <tt>true</tt> if the WAL has been updated. http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java new file mode 100644 index 0000000..f6381c4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/MultiMutation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + +public class MultiMutation extends Mutation { + + private ImmutableBytesPtr rowKey; + + public MultiMutation(ImmutableBytesPtr rowkey) { + this.rowKey = rowkey; + } + + /** + * @param stored + */ + public void addAll(Mutation stored) { + // add all the kvs + for (Entry<byte[], List<Cell>> kvs : stored.getFamilyCellMap().entrySet()) { + byte[] family = kvs.getKey(); + List<Cell> list = getKeyValueList(family, kvs.getValue().size()); + list.addAll(kvs.getValue()); + familyMap.put(family, list); + } + + // add all the attributes, not overriding already stored ones + for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) { + if (this.getAttribute(attrib.getKey()) == null) { + this.setAttribute(attrib.getKey(), attrib.getValue()); + } + } + } + + private List<Cell> getKeyValueList(byte[] family, int hint) { + List<Cell> list = familyMap.get(family); + if (list == null) { + list = new ArrayList<Cell>(hint); + } + return list; + } + + @Override + public byte[] getRow(){ + return this.rowKey.copyBytesIfNecessary(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((rowKey == null) ? 0 : rowKey.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + MultiMutation other = (MultiMutation)obj; + return rowKey.equals(other.rowKey); + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index dfb9ad4..4e329e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; @@ -40,8 +41,6 @@ public abstract class BaseIndexBuilder implements IndexBuilder { protected RegionCoprocessorEnvironment env; protected IndexCodec codec; - abstract protected boolean useRawScanToPrimeBlockCache(); - @Override public void extendBaseIndexBuilderInstead() {} @@ -65,9 +64,14 @@ public abstract class BaseIndexBuilder implements IndexBuilder { } @Override - public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException { // noop } + + @Override + public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + return IndexMetaData.NULL_INDEX_META_DATA; + } @Override public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) { @@ -98,22 +102,11 @@ public abstract class BaseIndexBuilder implements IndexBuilder { } @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered) + public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData context) throws IOException { throw new UnsupportedOperationException(); } - /** - * {@inheritDoc} - * <p> - * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies - * to different rows, even if they are in the same batch, or are independent updates. - */ - @Override - public byte[] getBatchId(Mutation m) { - return this.codec.getBatchId(m); - } - @Override public void stop(String why) { LOG.debug("Stopping because: " + why); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java new file mode 100644 index 0000000..1ce4e2e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.builder; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.covered.IndexCodec; + +/** + * + */ +public abstract class BaseIndexCodec implements IndexCodec { + + @Override + public void initialize(RegionCoprocessorEnvironment env) throws IOException { + // noop + } + + /** + * {@inheritDoc} + * <p> + * By default, the codec is always enabled. Subclasses should override this method if they want do + * decide to index on a per-mutation basis. + * @throws IOException + */ + @Override + public boolean isEnabled(Mutation m) throws IOException { + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java index d5fd34d..ae2125e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner; import org.apache.phoenix.hbase.index.parallel.Task; import org.apache.phoenix.hbase.index.parallel.TaskBatch; @@ -115,7 +116,8 @@ public class IndexBuildManager implements Stoppable { MiniBatchOperationInProgress<Mutation> miniBatchOp, Collection<? extends Mutation> mutations) throws Throwable { // notify the delegate that we have started processing a batch - this.delegate.batchStarted(miniBatchOp); + final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp); + this.delegate.batchStarted(miniBatchOp, indexMetaData); // parallelize each mutation into its own task // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would @@ -129,7 +131,7 @@ public class IndexBuildManager implements Stoppable { @Override public Collection<Pair<Mutation, byte[]>> call() throws IOException { - return delegate.getIndexUpdate(m); + return delegate.getIndexUpdate(m, indexMetaData); } }); @@ -156,28 +158,24 @@ public class IndexBuildManager implements Stoppable { } public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( - Collection<KeyValue> filtered) throws IOException { + Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException { // this is run async, so we can take our time here - return delegate.getIndexUpdateForFilteredRows(filtered); + return delegate.getIndexUpdateForFilteredRows(filtered, indexMetaData); } public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) { delegate.batchCompleted(miniBatchOp); } - public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) + public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData indexMetaData) throws IOException { - delegate.batchStarted(miniBatchOp); + delegate.batchStarted(miniBatchOp, indexMetaData); } public boolean isEnabled(Mutation m) throws IOException { return delegate.isEnabled(m); } - public byte[] getBatchId(Mutation m) { - return delegate.getBatchId(m); - } - @Override public void stop(String why) { if (stopped) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index 194fdcc..36aba77 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; /** * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table @@ -64,6 +65,7 @@ public interface IndexBuilder extends Stoppable { * Implementers must ensure that this method is thread-safe - it could (and probably will) be * called concurrently for different mutations, which may or may not be part of the same batch. * @param mutation update to the primary table to be indexed. + * @param context TODO * @return a Map of the mutations to make -> target index table name * @throws IOException on failure */ @@ -76,7 +78,7 @@ public interface IndexBuilder extends Stoppable { Noop Failure mode */ - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException; + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException; /** * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction @@ -94,12 +96,13 @@ public interface IndexBuilder extends Stoppable { * * @param filtered {@link KeyValue}s that previously existed, but won't be included * in further output from HBase. + * @param context TODO * * @return a {@link Map} of the mutations to make -> target index table name * @throws IOException on failure */ public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( - Collection<KeyValue> filtered) + Collection<KeyValue> filtered, IndexMetaData context) throws IOException; /** @@ -115,10 +118,13 @@ public interface IndexBuilder extends Stoppable { * <i>after</i> the {@link #getIndexUpdate} methods. Therefore, you will likely need an attribute * on your {@link Put}/{@link Delete} to indicate it is a batch operation. * @param miniBatchOp the full batch operation to be written + * @param context TODO * @throws IOException */ - public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException; + public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException; + public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException; + /** * This allows the codec to dynamically change whether or not indexing should take place for a * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making @@ -133,11 +139,4 @@ public interface IndexBuilder extends Stoppable { * @throws IOException */ public boolean isEnabled(Mutation m) throws IOException; - - /** - * @param m mutation that has been received by the indexer and is waiting to be indexed - * @return the ID of batch to which the Mutation belongs, or <tt>null</tt> if the mutation is not - * part of a batch. - */ - public byte[] getBatchId(Mutation m); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java index e3ef831..93de11e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java @@ -14,7 +14,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.phoenix.index.BaseIndexCodec; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; /** * Codec for creating index updates from the current state of a table. @@ -49,10 +49,11 @@ public interface IndexCodec { * @param state * the current state of the table that needs to be cleaned up. Generally, you only care about the latest * column values, for each column you are indexing for each index table. + * @param context TODO * @return the pairs of (deletes, index table name) that should be applied. * @throws IOException */ - public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException; + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException; // table state has the pending update already applied, before calling // get the new index entries @@ -68,10 +69,11 @@ public interface IndexCodec { * @param state * the current state of the table that needs to an index update Generally, you only care about the latest * column values, for each column you are indexing for each index table. + * @param context TODO * @return the pairs of (updates,index table name) that should be applied. * @throws IOException */ - public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException; + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException; /** * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't @@ -88,19 +90,4 @@ public interface IndexCodec { * @throws IOException */ public boolean isEnabled(Mutation m) throws IOException; - - /** - * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of - * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently. - * <p> - * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each - * batch. Otherwise, we cannot guarantee index correctness</b> - * - * @param m - * mutation that may or may not be part of the batch - * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch. - */ - public byte[] getBatchId(Mutation m); - - public void setContext(TableState state, Mutation mutation) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java new file mode 100644 index 0000000..ee25a40 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.covered; + +public interface IndexMetaData { + public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {}; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index f47a71a..2da5771 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.scanner.ScannerBuilder; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import com.google.common.collect.Maps; - /** * Manage the state of the HRegion's view of the table, for the single row. * <p> @@ -58,7 +56,6 @@ public class LocalTableState implements TableState { private List<Cell> kvs = new ArrayList<Cell>(); private List<? extends IndexedColumnGroup> hints; private CoveredColumns columnSet; - private final Map<String,Object> context = Maps.newHashMap(); public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) { this.env = environment; @@ -132,7 +129,7 @@ public class LocalTableState implements TableState { * state for any of the columns you are indexing. * <p> * <i>NOTE:</i> This method should <b>not</b> be used during - * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been + * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> * need to track the indexed columns. * <p> @@ -275,9 +272,4 @@ public class LocalTableState implements TableState { ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey()); return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond()); } - - @Override - public Map<String, Object> getContext() { - return context; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 4ba3671..11e7d1a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -62,14 +62,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { } @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException { + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData indexMetaData) throws IOException { // create a state manager, so we can manage each batch LocalTableState state = new LocalTableState(env, localTable, mutation); - codec.setContext(state, mutation); // build the index updates for each group IndexUpdateManager manager = new IndexUpdateManager(); - batchMutationAndAddUpdates(manager, state, mutation); + batchMutationAndAddUpdates(manager, state, mutation, indexMetaData); if (LOG.isDebugEnabled()) { LOG.debug("Found index updates for Mutation: " + mutation + "\n" + manager); @@ -84,16 +83,17 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { * don't all have the timestamp, so we need to manage everything in batches based on timestamp. * <p> * Adds all the updates in the {@link Mutation} to the state, as a side-effect. - * - * @param updateMap - * index updates into which to add new updates. Modified as a side-effect. * @param state * current state of the row for the mutation. * @param m * mutation to batch + * @param indexMetaData TODO + * @param updateMap + * index updates into which to add new updates. Modified as a side-effect. + * * @throws IOException */ - private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m) throws IOException { + private void batchMutationAndAddUpdates(IndexUpdateManager manager, LocalTableState state, Mutation m, IndexMetaData indexMetaData) throws IOException { // split the mutation into timestamp-based batches Collection<Batch> batches = createTimestampBatchesFromMutation(m); @@ -106,7 +106,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { * group will see that as the current state, which will can cause the a delete and a put to be created for * the next group. */ - if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) { + if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) { cleanupCurrentState = false; } } @@ -197,12 +197,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { * <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a * 'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier * batch already did the cleanup. + * @param indexMetaData TODO * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt> * otherwise * @throws IOException */ private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state, - boolean requireCurrentStateCleanup) throws IOException { + boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException { // need a temporary manager for the current batch. It should resolve any conflicts for the // current batch. Essentially, we can get the case where a batch doesn't change the current @@ -214,18 +215,18 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { // determine if we need to make any cleanup given the pending update. long batchTs = batch.getTimestamp(); state.setPendingUpdates(batch.getKvs()); - addCleanupForCurrentBatch(updateMap, batchTs, state); + addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData); // A.2 do a single pass first for the updates to the current state state.applyPendingUpdates(); - long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap); + long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData); // if all the updates are the latest thing in the index, we are done - don't go and fix history if (ColumnTracker.isNewestTime(minTs)) { return false; } // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the // index. after this, we have the correct view of the index, from the batch up to the index while (!ColumnTracker.isNewestTime(minTs)) { - minTs = addUpdateForGivenTimestamp(minTs, state, updateMap); + minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData); } // B. only cleanup the current state if we need to - its a huge waste of effort otherwise. @@ -240,7 +241,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { // cleanup the pending batch. If anything in the correct history is covered by Deletes used to // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both // because the update may have a different set of columns or value based on the update). - cleanupIndexStateFromBatchOnward(updateMap, batchTs, state); + cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData); // have to roll the state forward again, so the current state is correct state.applyPendingUpdates(); @@ -249,18 +250,18 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { return false; } - private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap) + private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData) throws IOException { state.setCurrentTimestamp(ts); - ts = addCurrentStateMutationsForBatch(updateMap, state); + ts = addCurrentStateMutationsForBatch(updateMap, state, indexMetaData); return ts; } - private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state) + private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData) throws IOException { // get the cleanup for the current state state.setCurrentTimestamp(batchTs); - addDeleteUpdatesToMap(updateMap, state, batchTs); + addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData); // ignore any index tracking from the delete state.resetTrackedColumns(); } @@ -271,19 +272,20 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { * * @param updateMap * to update with index mutations - * @param batch - * to apply to the current state * @param state * current state of the table + * @param indexMetaData TODO + * @param batch + * to apply to the current state * @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)} * returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>. * @throws IOException */ - private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) + private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state, IndexMetaData indexMetaData) throws IOException { // get the index updates for this current batch - Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state); + Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData); state.resetTrackedColumns(); /* @@ -346,13 +348,14 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { * @param state * current state of the primary table. Should already by setup to the correct state from which we want to * cleanup. + * @param indexMetaData TODO * @throws IOException */ - private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state) + private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData) throws IOException { // get the cleanup for the current state state.setCurrentTimestamp(batchTs); - addDeleteUpdatesToMap(updateMap, state, batchTs); + addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData); Set<ColumnTracker> trackers = state.getTrackedColumns(); long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP; for (ColumnTracker tracker : trackers) { @@ -363,21 +366,22 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { state.resetTrackedColumns(); if (!ColumnTracker.isNewestTime(minTs)) { state.setHints(Lists.newArrayList(trackers)); - cleanupIndexStateFromBatchOnward(updateMap, minTs, state); + cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData); } } /** - * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then add them to the + * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the * update map. * <p> * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc). + * @param indexMetaData TODO * * @throws IOException */ - protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts) + protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData) throws IOException { - Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state); + Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData); if (cleanup != null) { for (IndexUpdate d : cleanup) { if (!d.isValid()) { @@ -392,14 +396,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { } @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered) + public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException { // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows return null; } - - @Override - protected boolean useRawScanToPrimeBlockCache() { - return false; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java index b8b2f19..d8b215c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java @@ -80,6 +80,4 @@ public interface TableState { * @return the keyvalues in the pending update to the table. */ Collection<Cell> getPendingUpdate(); - - Map<String,Object> getContext(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java deleted file mode 100644 index d90edc1..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hbase.index.covered; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import co.cask.tephra.Transaction; -import co.cask.tephra.hbase98.TransactionAwareHTable; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.hbase.index.ValueGetter; -import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder; -import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; -import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -public class TxIndexBuilder extends BaseIndexBuilder { - private static final Log LOG = LogFactory.getLog(TxIndexBuilder.class); - public static final String TRANSACTION = "TRANSACTION"; - private TransactionAwareHTable txTable; - - @Override - public void setup(RegionCoprocessorEnvironment env) throws IOException { - super.setup(env); - HTableInterface htable = env.getTable(env.getRegion().getRegionInfo().getTable()); - this.txTable = new TransactionAwareHTable(htable); // TODO: close? - } - - @Override - public void stop(String why) { - try { - if (this.txTable != null) txTable.close(); - } catch (IOException e) { - LOG.warn("Unable to close txTable", e); - } finally { - super.stop(why); - } - } - - @Override - public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException { - // get the index updates for this current batch - TxTableState state = new TxTableState(mutation); - codec.setContext(state, mutation); - Transaction tx = (Transaction)state.getContext().get(TRANSACTION); - state.setCurrentTransaction(tx); - Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayListWithExpectedSize(2); - Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state); - for (IndexUpdate delete : deletes) { - indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); - } - state.addPendingUpdates(mutation); - // TODO: state may need to maintain the old state with the new state super imposed - // An alternate easier way would be to calculate the state after the data mutations - // have been applied. - Iterable<IndexUpdate> updates = codec.getIndexUpserts(state); - for (IndexUpdate update : updates) { - indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); - } - return indexUpdates; - } - - private class TxTableState implements TableState { - private Put put; - private Map<String, byte[]> attributes; - private List<Cell> pendingUpdates = Lists.newArrayList(); - private Transaction transaction; - private final Map<String,Object> context = Maps.newHashMap(); - - public TxTableState(Mutation m) { - this.put = new Put(m.getRow()); - this.attributes = m.getAttributesMap(); - } - - @Override - public RegionCoprocessorEnvironment getEnvironment() { - return env; - } - - @Override - public long getCurrentTimestamp() { - return transaction.getReadPointer(); - } - - @Override - public Map<String, byte[]> getUpdateAttributes() { - return attributes; - } - - @Override - public byte[] getCurrentRowKey() { - return put.getRow(); - } - - @Override - public List<? extends IndexedColumnGroup> getIndexColumnHints() { - return Collections.emptyList(); - } - - public void addPendingUpdate(Cell cell) throws IOException { - put.add(cell); - } - - public void addPendingUpdates(Mutation m) throws IOException { - if (m instanceof Delete) { - put.getFamilyCellMap().clear(); - } else { - CellScanner scanner = m.cellScanner(); - while (scanner.advance()) { - Cell cell = scanner.current(); - if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { - byte[] family = CellUtil.cloneFamily(cell); - byte[] qualifier = CellUtil.cloneQualifier(cell); - put.add(family, qualifier, HConstants.EMPTY_BYTE_ARRAY); - } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { - byte[] family = CellUtil.cloneFamily(cell); - put.getFamilyCellMap().remove(family); - } else { - put.add(cell); - } - } - } - } - - @Override - public Collection<Cell> getPendingUpdate() { - return pendingUpdates; - } - - public void setCurrentTransaction(Transaction tx) { - this.transaction = tx; - } - - @Override - public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns) - throws IOException { - ColumnTracker tracker = new ColumnTracker(indexedColumns); - IndexUpdate indexUpdate = new IndexUpdate(tracker); - final byte[] rowKey = getCurrentRowKey(); - if (!pendingUpdates.isEmpty()) { - final Map<ColumnReference, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates - .size()); - for (Cell kv : pendingUpdates) { - // create new pointers to each part of the kv - ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); - valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value); - } - ValueGetter getter = new ValueGetter() { - @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) { - // TODO: from IndexMaintainer we return null if ref is empty key value. Needed? - return valueMap.get(ref); - } - @Override - public byte[] getRowKey() { - return rowKey; - } - }; - return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate); - } - // Establish initial state of table by finding the old values - // We'll apply the Mutation to this next - Get get = new Get(rowKey); - get.setMaxVersions(); - for (ColumnReference ref : indexedColumns) { - get.addColumn(ref.getFamily(), ref.getQualifier()); - } - txTable.startTx(transaction); - final Result result = txTable.get(get); - ValueGetter getter = new ValueGetter() { - - @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException { - Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); - if (cell == null) { - return null; - } - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - return ptr; - } - - @Override - public byte[] getRowKey() { - return rowKey; - } - - }; - for (ColumnReference ref : indexedColumns) { - Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); - if (cell != null) { - addPendingUpdate(cell); - } - } - return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate); - } - - @Override - public Map<String, Object> getContext() { - return context; - } - - } - - @Override - protected boolean useRawScanToPrimeBlockCache() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index d4bd460..59bc8de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.scanner.Scanner; -import org.apache.phoenix.index.BaseIndexCodec; import com.google.common.collect.Lists; @@ -59,7 +60,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { List<IndexUpdate> updates = new ArrayList<IndexUpdate>(); for (ColumnGroup group : groups) { IndexUpdate update = getIndexUpdateForGroup(group, state); @@ -113,7 +114,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(); for (ColumnGroup group : groups) { deletes.add(getDeleteForGroup(group, state)); @@ -360,7 +361,4 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { // simple check for the moment. return groups.size() > 0; } - - @Override - public void setContext(TableState state, Mutation mutation) throws IOException {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java index de8f752..60698c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.covered.Batch; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager; @@ -118,8 +119,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder { @Override public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows( - Collection<KeyValue> filtered) throws IOException { - + Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException { // stores all the return values IndexUpdateManager updateMap = new IndexUpdateManager(); // batch the updates by row to make life easier and ordered @@ -146,7 +146,7 @@ public class CoveredColumnIndexer extends NonTxIndexBuilder { for (Batch entry : timeBatch) { //just set the timestamp on the table - it already has all the future state state.setCurrentTimestamp(entry.getTimestamp()); - this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp()); + this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp(), indexMetaData); } } return updateMap.toMap(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java deleted file mode 100644 index 1c45cd3..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.index; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; - -import org.apache.phoenix.hbase.index.covered.IndexCodec; - -/** - * - */ -public abstract class BaseIndexCodec implements IndexCodec { - - @Override - public void initialize(RegionCoprocessorEnvironment env) throws IOException { - // noop - } - - /** - * {@inheritDoc} - * <p> - * By default, the codec is always enabled. Subclasses should override this method if they want do - * decide to index on a per-mutation basis. - * @throws IOException - */ - @Override - public boolean isEnabled(Mutation m) throws IOException { - return true; - } - - /** - * {@inheritDoc} - * <p> - * Assumes each mutation is not in a batch. Subclasses that have different batching behavior - * should override this. - */ - @Override - public byte[] getBatchId(Mutation m) { - return null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index 1e28766..09a9f90 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -18,14 +18,32 @@ package org.apache.phoenix.index; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.write.IndexWriter; -import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; /** * Index builder for covered-columns index that ties into phoenix for faster use. @@ -33,6 +51,15 @@ import org.apache.phoenix.util.IndexUtil; public class PhoenixIndexBuilder extends NonTxIndexBuilder { @Override + public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap()); + } + + protected PhoenixIndexCodec getCodec() { + return (PhoenixIndexCodec)codec; + } + + @Override public void setup(RegionCoprocessorEnvironment env) throws IOException { super.setup(env); Configuration conf = env.getConfiguration(); @@ -43,16 +70,54 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder { } @Override - public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException { // The entire purpose of this method impl is to get the existing rows for the // table rows being indexed into the block cache, as the index maintenance code // does a point scan per row. - // TODO: provide a means for the transactional case to just return the Scanner - // for when this is executed as it seems like that would be more efficient. - IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache()); - } - - private PhoenixIndexCodec getCodec() { - return (PhoenixIndexCodec)this.codec; + List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size()); + Map<ImmutableBytesWritable, IndexMaintainer> maintainers = + new HashMap<ImmutableBytesWritable, IndexMaintainer>(); + ImmutableBytesWritable indexTableName = new ImmutableBytesWritable(); + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); + + for(IndexMaintainer indexMaintainer: indexMaintainers) { + if (indexMaintainer.isImmutableRows()) continue; + indexTableName.set(indexMaintainer.getIndexTableName()); + if (maintainers.get(indexTableName) != null) continue; + maintainers.put(indexTableName, indexMaintainer); + } + + } + if (maintainers.isEmpty()) return; + Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); + scan.setRaw(true); + ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + HRegion region = env.getRegion(); + RegionScanner scanner = region.getScanner(scan); + // Run through the scanner using internal nextRaw method + region.startRegionOperation(); + try { + synchronized (scanner) { + boolean hasMore; + do { + List<Cell> results = Lists.newArrayList(); + // Results are potentially returned even when the return value of s.next is + // false since this is an indication of whether or not there are more values + // after the ones returned + hasMore = scanner.nextRaw(results); + } while (hasMore); + } + } finally { + try { + scanner.close(); + } finally { + region.closeRegionOperation(); + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 571c559..1fe9931 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -10,53 +10,41 @@ package org.apache.phoenix.index; import java.io.IOException; -import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; -import co.cask.tephra.Transaction; - +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.cache.GlobalCache; -import org.apache.phoenix.cache.IndexMetaDataCache; -import org.apache.phoenix.cache.ServerCacheClient; -import org.apache.phoenix.cache.TenantCache; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; -import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; import org.apache.phoenix.hbase.index.covered.IndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.TableState; -import org.apache.phoenix.hbase.index.covered.TxIndexBuilder; -import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; -import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.ServerUtil; -import org.apache.phoenix.util.TransactionUtil; import com.google.common.collect.Lists; /** * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index ( - * {@link #getIndexDeletes(TableState)}) as well as what the new index state should be ( - * {@link #getIndexUpserts(TableState)}). + * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be ( + * {@link #getIndexUpserts(TableState, IndexMetaData)}). */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; public static final String INDEX_UUID = "IdxUUID"; public static final String INDEX_MAINTAINERS = "IndexMaintainers"; + private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; private RegionCoprocessorEnvironment env; - private KeyValueBuilder kvBuilder = GenericKeyValueBuilder.INSTANCE;; @Override public void initialize(RegionCoprocessorEnvironment env) throws IOException { @@ -71,67 +59,78 @@ public class PhoenixIndexCodec extends BaseIndexCodec { return true; } - public IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException { - if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } - byte[] uuid = attributes.get(INDEX_UUID); - if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } - byte[] md = attributes.get(INDEX_MD); - byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); - if (md != null) { - final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); - final Transaction txn = TransactionUtil.decodeTxnState(txState); - return new IndexMetaDataCache() { - - @Override - public void close() throws IOException {} - - @Override - public List<IndexMaintainer> getIndexMaintainers() { - return indexMaintainers; - } - - @Override - public Transaction getTransaction() { - return txn; - } - - }; - } else { - byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); - ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); - TenantCache cache = GlobalCache.getTenantCache(env, tenantId); - IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid)); - if (indexCache == null) { - String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); - SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND) - .setMessage(msg).build().buildException(); - ServerUtil.throwIOException("Index update failed", e); // will not return + @Override + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException { + List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { + return Collections.emptyList(); + } + // TODO: confirm that this special case isn't needed + // (as state should match this with the above call, since there are no mutable columns) + /* + if (maintainer.isImmutableRows()) { + indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns())); + indexUpdate.setTable(maintainer.getIndexTableName()); + valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate()); + } + */ + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(state.getCurrentRowKey()); + List<IndexUpdate> indexUpdates = Lists.newArrayList(); + for (IndexMaintainer maintainer : indexMaintainers) { + if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes + assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0); } - return indexCache; + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.getIndexTableName()); + Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env + .getRegion().getStartKey(), env.getRegion().getEndKey()); + indexUpdate.setUpdate(put); + indexUpdates.add(indexUpdate); } + return indexUpdates; + } + @Override + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException { + List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(state.getCurrentRowKey()); + List<IndexUpdate> indexUpdates = Lists.newArrayList(); + for (IndexMaintainer maintainer : indexMaintainers) { + if (maintainer.isLocalIndex()) { // TODO: remove this once verified assert passes + assert(Bytes.compareTo(maintainer.getIndexTableName(), MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName())) == 0); + } + if (maintainer.isImmutableRows()) { + continue; + } + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.getIndexTableName()); + Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), + state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey()); + indexUpdate.setUpdate(delete); + indexUpdates.add(indexUpdate); + } + return indexUpdates; } + /* @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException { - return getIndexUpdates(state, true); + public Iterable<IndexUpdate> getIndexUpserts(TableState state, BatchContext context) throws IOException { + return getIndexUpdates(state, context, true); } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException { - return getIndexUpdates(state, false); + public Iterable<IndexUpdate> getIndexDeletes(TableState state, BatchContext context) throws IOException { + return getIndexUpdates(state, context, false); } - /** - * @param state - * @param upsert - * prepare index upserts if it's true otherwise prepare index deletes. - * @return - * @throws IOException - */ - private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException { - @SuppressWarnings("unchecked") - List<IndexMaintainer> indexMaintainers = (List<IndexMaintainer>)state.getContext().get(INDEX_MAINTAINERS); + private Iterable<IndexUpdate> getIndexUpdates(TableState state, BatchContext context, boolean upsert) throws IOException { + List<IndexMaintainer> indexMaintainers = ((PhoenixBatchContext)context).getIndexMetaData().getIndexMaintainers(); if (indexMaintainers.isEmpty()) { return Collections.emptyList(); } List<IndexUpdate> indexUpdates = Lists.newArrayList(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); @@ -165,10 +164,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } Mutation mutation = null; if (upsert) { - mutation = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env + mutation = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env .getRegion().getStartKey(), env.getRegion().getEndKey()); } else { - mutation = maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(), + mutation = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey()); } indexUpdate.setUpdate(mutation); @@ -180,25 +179,10 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } return indexUpdates; } + */ @Override public boolean isEnabled(Mutation m) throws IOException { return hasIndexMaintainers(m.getAttributesMap()); } - - @Override - public byte[] getBatchId(Mutation m) { - Map<String, byte[]> attributes = m.getAttributesMap(); - return attributes.get(INDEX_UUID); - } - - @Override - public void setContext(TableState state, Mutation mutation) throws IOException { - IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes()); - List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers(); - Map<String,Object> context = state.getContext(); - context.clear(); - context.put(INDEX_MAINTAINERS, indexMaintainers); - context.put(TxIndexBuilder.TRANSACTION, indexCache.getTransaction()); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java new file mode 100644 index 0000000..26c1c12 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import co.cask.tephra.Transaction; + +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.IndexMetaDataCache; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; + +public class PhoenixIndexMetaData implements IndexMetaData { + private final IndexMetaDataCache indexMetaDataCache; + + private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException { + if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } + byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID); + if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } + byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD); + byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); + if (md != null) { + final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); + final Transaction txn = TransactionUtil.decodeTxnState(txState); + return new IndexMetaDataCache() { + + @Override + public void close() throws IOException {} + + @Override + public List<IndexMaintainer> getIndexMaintainers() { + return indexMaintainers; + } + + @Override + public Transaction getTransaction() { + return txn; + } + + }; + } else { + byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); + ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); + TenantCache cache = GlobalCache.getTenantCache(env, tenantId); + IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid)); + if (indexCache == null) { + String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); + SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND) + .setMessage(msg).build().buildException(); + ServerUtil.throwIOException("Index update failed", e); // will not return + } + return indexCache; + } + + } + + public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException { + indexMetaDataCache = getIndexMetaData(env, attributes); + } + + public Transaction getTransaction() { + return indexMetaDataCache.getTransaction(); + } + + public List<IndexMaintainer> getIndexMaintainers() { + return indexMetaDataCache.getIndexMaintainers(); + } +}
