Author: kturner
Date: Thu Jan 17 16:28:48 2013
New Revision: 1434762
URL: http://svn.apache.org/viewvc?rev=1434762&view=rev
Log:
ACCUMULO-956 checkin of patch from Brain Loss
Added:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
Added:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java?rev=1434762&view=auto
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
(added)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
Thu Jan 17 16:28:48 2013
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.OptionDescriber;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.VisibilityEvaluator;
+import org.apache.accumulo.core.security.VisibilityParseException;
+import org.apache.accumulo.core.util.BadArgumentException;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * The KeyTransformingIterator allows portions of a key (except for the row)
+ * to be transformed. This iterator handles the details that come with
modifying
+ * keys (i.e., that the sort order could change). In order to do so, however,
+ * the iterator must put all keys sharing the same prefix in memory. Prefix
+ * is defined as the parts of the key that are not modified by this iterator.
+ * That is, if the iterator modifies column qualifier and timestamp, then the
+ * prefix is row and column family. In that case, the iterator must load all
+ * column qualifiers for each row/column family pair into memory. Given this
+ * constraint, care must be taken by users of this iterator to ensure it is
+ * not run in such a way that will overrun memory in a tablet server.
+ * <p>
+ * If the implementing iterator is transforming column families, then it
+ * must also override {@link #untransformColumnFamilies(Collection)} to handle
+ * the case when column families are fetched at scan time. The fetched column
+ * families will/must be in the transformed space, and the untransformed column
+ * families need to be passed to this iterator's source. If it is not possible
+ * to write a reverse transformation (e.g., the column family transformation
+ * depends on the row value or something like that), then the iterator must
+ * not fetch specific column families (or only fetch column families that are
+ * known to not transform at all).
+ * <p>
+ * If the implementing iterator is transforming column visibilities, then
+ * users must be careful NOT to fetch column qualifiers from the scanner.
+ * The reason for this is due to ACCUMULO-??? (insert issue number).
+ * <p>
+ * If the implementing iterator is transforming column visibilities, then the
+ * user should be sure to supply authorizations via the {@link #AUTH_OPT}
+ * iterator option (note that this is only necessary for scan scope iterators).
+ * The supplied authorizations should be in the transformed space, but the
+ * authorizations supplied to the scanner should be in the untransformed
+ * space. That is, if the iterator transforms A to 1, B to 2, C to 3, etc,
+ * then the auths supplied when the scanner is constructed should be A,B,C,...
+ * and the auths supplied to the iterator should be 1,2,3,... The reason
+ * for this is that the scanner performs security filtering before this
+ * iterator is called, so the authorizations need to be in the original
+ * untransformed space. Since the iterator can transform visibilities, it is
+ * possible that it could produce visibilities that the user cannot see,
+ * so the transformed keys must be tested to ensure the user is allowed to
view
+ * them. Note that this test is not necessary when the iterator is not used
+ * in the scan scope since no security filtering is performed during major and
+ * minor compactions. It should also be noted that this iterator implements
the
+ * security filtering rather than relying on a follow-on iterator to do it so
+ * that we ensure the test is performed.
+ */
+abstract public class KeyTransformingIterator extends WrappingIterator
implements OptionDescriber {
+ public static final String AUTH_OPT = "authorizations";
+ protected Logger log = Logger.getLogger(getClass());
+
+ protected ArrayList<Pair<Key,Value>> keys = new ArrayList<Pair<Key,Value>>();
+ protected int keyPos = -1;
+ protected boolean scanning;
+ protected Range seekRange;
+ protected Collection<ByteSequence> seekColumnFamilies;
+ protected boolean seekColumnFamiliesInclusive;
+
+ private VisibilityEvaluator ve = null;
+ private LRUMap visibleCache = null;
+ private LRUMap parsedVisibilitiesCache = null;
+
+ private static Comparator<Pair<Key,Value>> keyComparator = new
Comparator<Pair<Key,Value>>() {
+ @Override
+ public int compare(Pair<Key,Value> o1, Pair<Key,Value> o2) {
+ return o1.getFirst().compareTo(o2.getFirst());
+ }
+ };
+
+ public KeyTransformingIterator() {
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ scanning = IteratorScope.scan.equals(env.getIteratorScope());
+ if (scanning) {
+ String auths = options.get(AUTH_OPT);
+ if (auths != null && !auths.isEmpty()) {
+ ve = new VisibilityEvaluator(new Authorizations(auths.split(",")));
+ visibleCache = new LRUMap(100);
+ }
+ }
+
+ parsedVisibilitiesCache = new LRUMap(100);
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ String desc = "This iterator allows keys to be transformed.";
+ String authDesc = "Comma-separated list of user's scan authorizations. " +
+ "If excluded or empty, then no visibility check is performed on
transformed keys.";
+ return new IteratorOptions(getClass().getSimpleName(), desc,
Collections.singletonMap(AUTH_OPT, authDesc), null);
+ }
+
+ @Override
+ public boolean validateOptions(Map<String,String> options) {
+ return true;
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ KeyTransformingIterator copy;
+
+ try {
+ copy = getClass().newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ copy.setSource(getSource().deepCopy(env));
+
+ copy.scanning = scanning;
+ copy.keyPos = keyPos;
+ copy.keys.addAll(keys);
+ copy.seekRange = (seekRange == null) ? null : new Range(seekRange);
+ copy.seekColumnFamilies = (seekColumnFamilies == null) ? null : new
HashSet<ByteSequence>(seekColumnFamilies);
+ copy.seekColumnFamiliesInclusive = seekColumnFamiliesInclusive;
+
+ copy.ve = ve;
+ if (visibleCache != null) {
+ copy.visibleCache = new LRUMap(visibleCache.maxSize());
+ copy.visibleCache.putAll(visibleCache);
+ }
+
+ if (parsedVisibilitiesCache != null) {
+ copy.parsedVisibilitiesCache = new
LRUMap(parsedVisibilitiesCache.maxSize());
+ copy.parsedVisibilitiesCache.putAll(parsedVisibilitiesCache);
+ }
+
+ return copy;
+ }
+
+ @Override
+ public boolean hasTop() {
+ return keyPos >= 0 && keyPos < keys.size();
+ }
+
+ @Override
+ public Key getTopKey() {
+ return hasTop() ? keys.get(keyPos).getFirst() : null;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return hasTop() ? keys.get(keyPos).getSecond() : null;
+ }
+
+ @Override
+ public void next() throws IOException {
+ // Move on to the next entry since we returned the entry at keyPos before
+ if (keyPos >= 0)
+ keyPos++;
+
+ // If we emptied out the transformed key map then transform the next key
+ // set from the source. Itâs possible that transformation could produce
keys
+ // that are outside of our range or are not visible to the end user, so
after the
+ // call below we might not have added any keys to the map. Keep going
until
+ // we either get some keys in the map or exhaust the source iterator.
+ while (!hasTop() && super.hasTop())
+ transformKeys();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {
+ seekRange = (range != null) ? new Range(range) : null;
+ seekColumnFamilies = columnFamilies;
+ seekColumnFamiliesInclusive = inclusive;
+
+ // Seek the source iterator, but use a recalculated range that ensures
+ // we see all keys with the same "prefix." We need to do this since
+ // transforming could change the sort order and transformed keys that
+ // are before the range start could be inside the range after
transformation.
+ super.seek(computeReseekRange(range),
untransformColumnFamilies(columnFamilies), inclusive);
+
+ // Range clipping could cause us to trim out all the keys we transformed.
+ // Keep looping until we either have some keys in the output range, or have
+ // exhausted the source iterator.
+ keyPos = -1; // âClearâ list so hasTop returns false to get us into
the loop (transformKeys actually clears)
+ while (!hasTop() && super.hasTop()) {
+ // Build up a sorted list of all keys for the same prefix. When
+ // people ask for keys, return from this list first until it is empty
+ // before incrementing the source iterator.
+ transformKeys();
+ }
+ }
+
+ /**
+ * Reads all keys matching the first key's prefix from the source
+ * iterator, transforms them, and sorts the resulting keys. Transformed
+ * keys that fall outside of our seek range or can't be seen by the
+ * user are excluded.
+ */
+ protected void transformKeys() throws IOException {
+ keyPos = -1;
+ keys.clear();
+ Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null;
+
+ while (super.hasTop()) {
+ Key sourceTopKey = super.getTopKey();
+
+ // If the source key equals our prefix key (up to the prefix), then
+ // we have a key that needs transformed. Otherwise, we're done.
+ if (sourceTopKey.equals(prefixKey, getKeyPrefix())) {
+ Key transformedKey = transformKey(sourceTopKey);
+
+ // If the transformed key didn't actually change, then we need
+ // to make a copy since we're caching it in our keys list.
+ if (transformedKey == sourceTopKey)
+ transformedKey = new Key(sourceTopKey);
+ // We could check that the transformed key didn't transform anything
+ // in the key prefix here...
+
+ // Transformation could have produced a key that falls outside
+ // of the seek range, or one that the user cannot see. Check
+ // these before adding it to the output list.
+ if (includeTransformedKey(transformedKey))
+ keys.add(new Pair<Key,Value>(transformedKey, new
Value(super.getTopValue())));
+ } else {
+ break;
+ }
+
+ super.next();
+ }
+
+ if (!keys.isEmpty()) {
+ Collections.sort(keys, keyComparator);
+ keyPos = 0;
+ }
+ }
+
+ /**
+ * Determines whether or not to include {@code transformedKey} in the
+ * output. It is possible that transformation could have produced a key
+ * that falls outside of the seek range, a key with a visibility the user
+ * can't see, a key with a visibility that doesn't parse, or a key with a
+ * column family that wasn't fetched. We only do some checks (outside the
+ * range, user can see) if we're scanning. The range check is not done for
+ * major/minor compaction since seek ranges won't be in our transformed key
+ * space and we will never change the row so we can't produce keys that
+ * would fall outside the tablet anyway.
+ *
+ * @param transformedKey the key to check
+ * @return {@code true} if the key should be included and {@code false} if
not
+ */
+ protected boolean includeTransformedKey(Key transformedKey) {
+ boolean include = canSee(transformedKey);
+ if (scanning && seekRange != null) {
+ include = include && seekRange.contains(transformedKey);
+ }
+ return include;
+ }
+
+ /**
+ * Indicates whether or not the user is able to see {@code key}. If the
+ * user has not supplied authorizations, or the iterator is not in the
+ * scan scope, then this method simply returns {@code true}. Otherwise,
+ * {@code key}'s column visibility is tested against the user-supplied
+ * authorizations, and the test result is returned. For performance,
+ * the test results are cached so that the same visibility is not tested
+ * multiple times.
+ *
+ * @param key the key to test
+ * @return {@code true} if the key is visible or iterator is not scanning,
+ * and {@code false} if not
+ */
+ protected boolean canSee(Key key) {
+ // Ensure that the visibility (which could have been transformed) parses.
+ ByteSequence visibility = key.getColumnVisibilityData();
+ ColumnVisibility colVis = (ColumnVisibility)
parsedVisibilitiesCache.get(visibility);
+ if (colVis == null) {
+ try {
+ colVis = new ColumnVisibility(visibility.toArray());
+ } catch (BadArgumentException e) {
+ log.error("Transformation produced an invalid visibility: " +
visibility);
+ throw e;
+ }
+ }
+
+ Boolean visible = canSeeColumnFamily(key);
+
+ if (!scanning || !visible || ve == null || visibleCache == null)
+ return visible;
+
+ visible = (Boolean) visibleCache.get(visibility);
+ if (visible == null) {
+ try {
+ visible = ve.evaluate(colVis);
+ visibleCache.put(visibility, visible);
+ } catch (VisibilityParseException e) {
+ log.error("Parse Error", e);
+ visible = Boolean.FALSE;
+ } catch (BadArgumentException e) {
+ log.error("Parse Error", e);
+ visible = Boolean.FALSE;
+ }
+ }
+
+ return visible;
+ }
+
+ /**
+ * Indicates whether or not {@code key} can be seen, according to the
+ * fetched column families for this iterator.
+ *
+ * @param key the key whose column family is to be tested
+ * @return {@code true} if {@code key}'s column family is one of those
fetched
+ * in the set passed to our {@link #seek(Range, Collection,
boolean)} method
+ */
+ protected boolean canSeeColumnFamily(Key key) {
+ boolean visible = true;
+ if (seekColumnFamilies != null) {
+ ByteSequence columnFamily = key.getColumnFamilyData();
+ if (seekColumnFamiliesInclusive)
+ visible = seekColumnFamilies.contains(columnFamily);
+ else
+ visible = !seekColumnFamilies.contains(columnFamily);
+ }
+ return visible;
+ }
+
+ /**
+ * Possibly expand {@code range} to include everything for the key prefix
+ * we are working with. That is, if our prefix is ROW_COLFAM, then we
+ * need to expand the range so we're sure to include all entries having
+ * the same row and column family as the start/end of the range.
+ *
+ * @param range the range to expand
+ * @return the modified range
+ */
+ protected Range computeReseekRange(Range range) {
+ Key startKey = range.getStartKey();
+ boolean startKeyInclusive = range.isStartKeyInclusive();
+ // If anything after the prefix is set, then clip the key so we include
+ // everything for the prefix.
+ if (isSetAfterPart(startKey, getKeyPrefix())) {
+ startKey = copyPartialKey(startKey, getKeyPrefix());
+ startKeyInclusive = true;
+ }
+ Key endKey = range.getEndKey();
+ boolean endKeyInclusive = range.isEndKeyInclusive();
+ if (isSetAfterPart(endKey, getKeyPrefix())) {
+ endKey = endKey.followingKey(getKeyPrefix());
+ endKeyInclusive = true;
+ }
+ return new Range(startKey, startKeyInclusive, endKey, endKeyInclusive);
+ }
+
+ /**
+ * Indicates whether or not any part of {@code key} excluding
+ * {@code part} is set. For example, if part is ROW_COLFAM_COLQUAL,
+ * then this method determines whether or not the column visibility,
+ * timestamp, or delete flag is set on {@code key}.
+ *
+ * @param key the key to check
+ * @param part the part of the key that doesn't need to be checked
(everything after does)
+ * @return {@code true} if anything after {@code part} is set on {@code
key}, and {@code false} if not
+ */
+ protected boolean isSetAfterPart(Key key, PartialKey part) {
+ boolean isSet = false;
+ if (key != null) {
+ // Breaks excluded on purpose.
+ switch (part) {
+ case ROW:
+ isSet = isSet || key.getColumnFamilyData().length() > 0;
+ case ROW_COLFAM:
+ isSet = isSet || key.getColumnQualifierData().length() > 0;
+ case ROW_COLFAM_COLQUAL:
+ isSet = isSet || key.getColumnVisibilityData().length() > 0;
+ case ROW_COLFAM_COLQUAL_COLVIS:
+ isSet = isSet || key.getTimestamp() < Long.MAX_VALUE;
+ case ROW_COLFAM_COLQUAL_COLVIS_TIME:
+ isSet = isSet || key.isDeleted();
+ case ROW_COLFAM_COLQUAL_COLVIS_TIME_DEL:
+ break;
+ }
+ }
+ return isSet;
+ }
+
+ /**
+ * Creates a copy of {@code key}, copying only the parts of the key specified
+ * in {@code part}. For example, if {@code part} is ROW_COLFAM_COLQUAL, then
+ * this method would copy the row, column family, and column qualifier from
+ * {@code key} into a new key.
+ *
+ * @param key the key to copy
+ * @param part the parts of {@code key} to copy
+ * @return the new key containing {@code part} of {@code key}
+ */
+ protected Key copyPartialKey(Key key, PartialKey part) {
+ Key keyCopy;
+ switch (part) {
+ case ROW:
+ keyCopy = new Key(key.getRow());
+ break;
+ case ROW_COLFAM:
+ keyCopy = new Key(key.getRow(), key.getColumnFamily());
+ break;
+ case ROW_COLFAM_COLQUAL:
+ keyCopy = new Key(key.getRow(), key.getColumnFamily(),
key.getColumnQualifier());
+ break;
+ case ROW_COLFAM_COLQUAL_COLVIS:
+ keyCopy = new Key(key.getRow(), key.getColumnFamily(),
key.getColumnQualifier(), key.getColumnVisibility());
+ break;
+ case ROW_COLFAM_COLQUAL_COLVIS_TIME:
+ keyCopy = new Key(key.getRow(), key.getColumnFamily(),
key.getColumnQualifier(),
+ key.getColumnVisibility(), key.getTimestamp());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported key part: " + part);
+ }
+ return keyCopy;
+ }
+
+ /**
+ * Make a new key with all parts (including delete flag) coming from {@code
originalKey} but
+ * use {@code newColFam} as the column family.
+ */
+ protected Key replaceColumnFamily(Key originalKey, Text newColFam) {
+ byte[] row = originalKey.getRowData().toArray();
+ byte[] cf = newColFam.getBytes();
+ byte[] cq = originalKey.getColumnQualifierData().toArray();
+ byte[] cv = originalKey.getColumnVisibilityData().toArray();
+ long timestamp = originalKey.getTimestamp();
+ Key newKey = new Key(row, 0, row.length,
+ cf, 0, newColFam.getLength(),
+ cq, 0, cq.length,
+ cv, 0, cv.length,
+ timestamp);
+ newKey.setDeleted(originalKey.isDeleted());
+ return newKey;
+ }
+
+ /**
+ * Make a new key with all parts (including delete flag) coming from {@code
originalKey} but
+ * use {@code newColQual} as the column qualifier.
+ */
+ protected Key replaceColumnQualifier(Key originalKey, Text newColQual) {
+ byte[] row = originalKey.getRowData().toArray();
+ byte[] cf = originalKey.getColumnFamilyData().toArray();
+ byte[] cq = newColQual.getBytes();
+ byte[] cv = originalKey.getColumnVisibilityData().toArray();
+ long timestamp = originalKey.getTimestamp();
+ Key newKey = new Key(row, 0, row.length,
+ cf, 0, cf.length,
+ cq, 0, newColQual.getLength(),
+ cv, 0, cv.length,
+ timestamp);
+ newKey.setDeleted(originalKey.isDeleted());
+ return newKey;
+ }
+
+ /**
+ * Make a new key with all parts (including delete flag) coming from {@code
originalKey} but
+ * use {@code newColVis} as the column visibility.
+ */
+ protected Key replaceColumnVisibility(Key originalKey, Text newColVis) {
+ byte[] row = originalKey.getRowData().toArray();
+ byte[] cf = originalKey.getColumnFamilyData().toArray();
+ byte[] cq = originalKey.getColumnQualifierData().toArray();
+ byte[] cv = newColVis.getBytes();
+ long timestamp = originalKey.getTimestamp();
+ Key newKey = new Key(row, 0, row.length,
+ cf, 0, cf.length,
+ cq, 0, cq.length,
+ cv, 0, newColVis.getLength(),
+ timestamp);
+ newKey.setDeleted(originalKey.isDeleted());
+ return newKey;
+ }
+
+ /**
+ * Make a new key with a column family, column qualifier, and column
visibility.
+ * Copy the rest of the parts of the key (including delete flag) from {@code
originalKey}.
+ */
+ protected Key replaceKeyParts(Key originalKey, Text newColFam, Text
newColQual, Text newColVis) {
+ byte[] row = originalKey.getRowData().toArray();
+ byte[] cf = newColFam.getBytes();
+ byte[] cq = newColQual.getBytes();
+ byte[] cv = newColVis.getBytes();
+ long timestamp = originalKey.getTimestamp();
+ Key newKey = new Key(row, 0, row.length,
+ cf, 0, newColFam.getLength(),
+ cq, 0, newColQual.getLength(),
+ cv, 0, newColVis.getLength(),
+ timestamp);
+ newKey.setDeleted(originalKey.isDeleted());
+ return newKey;
+ }
+
+ /**
+ * Make a new key with a column qualifier, and column visibility. Copy the
rest
+ * of the parts of the key (including delete flag) from {@code originalKey}.
+ */
+ protected Key replaceKeyParts(Key originalKey, Text newColQual, Text
newColVis) {
+ byte[] row = originalKey.getRowData().toArray();
+ byte[] cf = originalKey.getColumnFamilyData().toArray();
+ byte[] cq = newColQual.getBytes();
+ byte[] cv = newColVis.getBytes();
+ long timestamp = originalKey.getTimestamp();
+ Key newKey = new Key(row, 0, row.length,
+ cf, 0, cf.length,
+ cq, 0, newColQual.getLength(),
+ cv, 0, newColVis.getLength(),
+ timestamp);
+ newKey.setDeleted(originalKey.isDeleted());
+ return newKey;
+ }
+
+ /**
+ * Reverses the transformation applied to column families that are fetched
at seek
+ * time. If this iterator is transforming column families, then this method
+ * should be overridden to reverse the transformation on the supplied
collection
+ * of column families. This is necessary since the fetch/seek will be
performed
+ * in the transformed space, but when passing the column family set on to
the source,
+ * the column families need to be in the untransformed space.
+ *
+ * @param columnFamilies the column families that have been fetched at seek
time
+ * @return the untransformed column families that would transform info
{@code columnFamilies}
+ */
+ protected Collection<ByteSequence>
untransformColumnFamilies(Collection<ByteSequence> columnFamilies) {
+ return columnFamilies;
+ }
+
+ /**
+ * Indicates the prefix of keys that will be transformed by this iterator.
+ * In other words, this is the part of the key that will <i>not</i> be
+ * transformed by this iterator. For example, if this method returns
+ * ROW_COLFAM, then {@link #transformKey(Key)} may be changing the
+ * column qualifier, column visibility, or timestamp, but it won't be
+ * changing the row or column family.
+ *
+ * @return the part of the key this iterator is not transforming
+ */
+ abstract protected PartialKey getKeyPrefix();
+
+ /**
+ * Transforms {@code originalKey}. This method must not change the row
+ * part of the key, and must only change the parts of the key after the
+ * return value of {@link #getKeyPrefix()}. Implementors must also remember
+ * to copy the delete flag from {@code originalKey} onto the new key. Or,
+ * implementors should use one of the helper methods to produce the new
+ * key. See any of the replaceKeyParts methods.
+ *
+ * @param originalKey the key to be transformed
+ * @return the modified key
+ * @see #replaceColumnFamily(Key, Text)
+ * @see #replaceColumnQualifier(Key, Text)
+ * @see #replaceColumnVisibility(Key, Text)
+ * @see #replaceKeyParts(Key, Text, Text)
+ * @see #replaceKeyParts(Key, Text, Text, Text)
+ */
+ abstract protected Key transformKey(Key originalKey);
+}
Added:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java?rev=1434762&view=auto
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
(added)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
Thu Jan 17 16:28:48 2013
@@ -0,0 +1,539 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KeyTransformingIteratorTest {
+ private static final String TABLE_NAME = "test_table";
+ private static Authorizations authorizations = new Authorizations("vis0",
"vis1", "vis2", "vis3", "vis4");
+ private Connector connector;
+ private Scanner scanner;
+
+ @Before
+ public void setUpMockAccumulo() throws Exception {
+ MockInstance instance = new MockInstance("test");
+ connector = instance.getConnector("user", "password");
+ connector.securityOperations().changeUserAuthorizations("user",
authorizations);
+
+ if (connector.tableOperations().exists(TABLE_NAME))
+ connector.tableOperations().delete(TABLE_NAME);
+ connector.tableOperations().create(TABLE_NAME);
+ BatchWriterConfig bwCfg = new BatchWriterConfig();
+ bwCfg.setMaxWriteThreads(1);
+
+ BatchWriter bw = connector.createBatchWriter(TABLE_NAME, bwCfg);
+ bw.addMutation(createDefaultMutation("row1"));
+ bw.addMutation(createDefaultMutation("row2"));
+ bw.addMutation(createDefaultMutation("row3"));
+
+ bw.flush();
+ bw.close();
+
+ scanner = connector.createScanner(TABLE_NAME, authorizations);
+ scanner.addScanIterator(new IteratorSetting(20, ReuseIterator.class));
+ }
+
+ private void setUpTransformIterator(Class<? extends KeyTransformingIterator>
clazz) {
+ IteratorSetting cfg = new IteratorSetting(21, clazz);
+ cfg.setName("keyTransformIter");
+ cfg.addOption(KeyTransformingIterator.AUTH_OPT, "vis0, vis1, vis2, vis3");
+ scanner.addScanIterator(cfg);
+ }
+
+ @Test
+ public void testIdentityScan() throws Exception {
+ setUpTransformIterator(IdentityKeyTransformingIterator.class);
+
+ // This is just an identity scan, but with the "reuse" iterator that reuses
+ // the same key/value pair for every getTopKey/getTopValue call. The code
+ // will always return the final key/value if we didn't copy the original
key
+ // in the iterator.
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int row = 1; row <= 3; ++row) {
+ for (int cf = 1; cf <= 3; ++cf) {
+ for (int cq = 1; cq <= 3; ++cq) {
+ for (int cv = 1; cv <= 3; ++cv) {
+ putExpected(expected, row, cf, cq, cv, null);
+ }
+ }
+ }
+ }
+
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testNoRangeScan() throws Exception {
+ @SuppressWarnings("unchecked")
+ List<Class<? extends ReversingKeyTransformingIterator>> classes =
Arrays.asList(ColFamReversingKeyTransformingIterator.class,
ColQualReversingKeyTransformingIterator.class,
ColVisReversingKeyTransformingIterator.class);
+
+ // Test transforming col fam, col qual, col vis
+ for (Class<? extends ReversingKeyTransformingIterator> clazz : classes) {
+ scanner.removeScanIterator("keyTransformIter");
+ setUpTransformIterator(clazz);
+
+ // All rows with visibilities reversed
+ KeyTransformingIterator iter = clazz.newInstance();
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int row = 1; row <= 3; ++row) {
+ for (int cf = 1; cf <= 3; ++cf) {
+ for (int cq = 1; cq <= 3; ++cq) {
+ for (int cv = 1; cv <= 3; ++cv) {
+ putExpected(expected, row, cf, cq, cv, iter.getKeyPrefix());
+ }
+ }
+ }
+ }
+
+ checkExpected(expected);
+ }
+ }
+
+ @Test
+ public void testVisbilityFiltering() throws Exception {
+ // Should return nothing since we produced visibilities that can't be seen
+ setUpTransformIterator(BadVisKeyTransformingIterator.class);
+ checkExpected(new TreeMap<Key,Value>());
+
+ // Do a "reverse" on the visibility (vis1 -> vis2, vis2 -> vis3, vis3 ->
vis0)
+ // Source data has vis1, vis2, vis3 so vis0 is a new one that is
introduced.
+ // Make sure it shows up in the output with the default test auths which
include
+ // vis0.
+ scanner.removeScanIterator("keyTransformIter");
+ setUpTransformIterator(ColVisReversingKeyTransformingIterator.class);
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int row = 1; row <= 3; ++row) {
+ for (int cf = 1; cf <= 3; ++cf) {
+ for (int cq = 1; cq <= 3; ++cq) {
+ for (int cv = 1; cv <= 3; ++cv) {
+ putExpected(expected, row, cf, cq, cv,
PartialKey.ROW_COLFAM_COLQUAL);
+ }
+ }
+ }
+ }
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testRangeStart() throws Exception {
+ setUpTransformIterator(ColVisReversingKeyTransformingIterator.class);
+ scanner.setRange(new Range(new Key("row1", "cf2", "cq2", "vis1"), true,
new Key("row1", "cf2", "cq3"), false));
+
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ putExpected(expected, 1, 2, 2, 1, PartialKey.ROW_COLFAM_COLQUAL); //
before the range start, but transforms in the range
+ putExpected(expected, 1, 2, 2, 2, PartialKey.ROW_COLFAM_COLQUAL);
+
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testRangeEnd() throws Exception {
+ setUpTransformIterator(ColVisReversingKeyTransformingIterator.class);
+ scanner.setRange(new Range(new Key("row1", "cf2", "cq2"), true, new
Key("row1", "cf2", "cq2", "vis2"), false));
+
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ //putExpected(expected, 1, 2, 2, 1, part); // transforms vis outside range
end
+ putExpected(expected, 1, 2, 2, 2, PartialKey.ROW_COLFAM_COLQUAL);
+ putExpected(expected, 1, 2, 2, 3, PartialKey.ROW_COLFAM_COLQUAL);
+
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testPrefixRange() throws Exception {
+ setUpTransformIterator(ColFamReversingKeyTransformingIterator.class);
+ // Set a range that is before all of the untransformed data. However,
+ // the data with untransformed col fam cf3 will transform to cf0 and
+ // be inside the range.
+ scanner.setRange(new Range(new Key("row1", "cf0"), true, new Key("row1",
"cf1"), false));
+
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int cq = 1; cq <= 3; ++cq)
+ for (int cv = 1; cv <= 3; ++cv)
+ putExpected(expected, 1, 3, cq, cv, PartialKey.ROW);
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testPostfixRange() throws Exception {
+ // Set a range that's after all data and make sure we don't
+ // somehow return something.
+ setUpTransformIterator(ColFamReversingKeyTransformingIterator.class);
+ scanner.setRange(new Range(new Key("row4"), null));
+ checkExpected(new TreeMap<Key,Value>());
+ }
+
+ @Test
+ public void testReplaceKeyParts() throws Exception {
+ KeyTransformingIterator it = new IdentityKeyTransformingIterator();
+ Key originalKey = new Key("r", "cf", "cq", "cv", 42);
+ originalKey.setDeleted(true);
+
+ Key newKey = it.replaceColumnFamily(originalKey, new Text("test"));
+ assertEquals(createDeleteKey("r","test","cq","cv",42), newKey);
+
+ newKey = it.replaceColumnQualifier(originalKey, new Text("test"));
+ assertEquals(createDeleteKey("r","cf","test","cv",42), newKey);
+
+ newKey = it.replaceColumnVisibility(originalKey, new Text("test"));
+ assertEquals(createDeleteKey("r","cf","cq","test",42), newKey);
+
+ newKey = it.replaceKeyParts(originalKey, new Text("testCQ"), new
Text("testCV"));
+ assertEquals(createDeleteKey("r","cf","testCQ","testCV",42), newKey);
+
+ newKey = it.replaceKeyParts(originalKey, new Text("testCF"), new
Text("testCQ"), new Text("testCV"));
+ assertEquals(createDeleteKey("r","testCF","testCQ","testCV",42), newKey);
+ }
+
+ @Test
+ public void testFetchColumnFamilites() throws Exception {
+ // In this test, we are fetching column family cf2, which is in
+ // the transformed space. The source column family that will
+ // transform into cf2 is cf1, so that is the column family we
+ // put in the expectations.
+ int expectedCF = 1;
+ setUpTransformIterator(ColFamReversingKeyTransformingIterator.class);
+ scanner.fetchColumnFamily(new Text("cf2"));
+
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int row = 1; row <= 3; ++row)
+ for (int cq = 1; cq <= 3; ++cq)
+ for (int cv = 1; cv <= 3; ++cv)
+ putExpected(expected, row, expectedCF, cq, cv, PartialKey.ROW);
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testDeepCopy() throws Exception {
+
+ }
+
+ @Test
+ public void testCompactionScanFetchingColumnFamilies() throws Exception {
+ // In this test, we are fetching column family cf2, which is in
+ // the transformed space. The source column family that will
+ // transform into cf2 is cf1, so that is the column family we
+ // put in the expectations.
+ int expectedCF = 1;
+
setUpTransformIterator(ColFamReversingCompactionKeyTransformingIterator.class);
+ scanner.fetchColumnFamily(new Text("cf2"));
+
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int row = 1; row <= 3; ++row)
+ for (int cq = 1; cq <= 3; ++cq)
+ for (int cv = 1; cv <= 3; ++cv)
+ putExpected(expected, row, expectedCF, cq, cv, PartialKey.ROW);
+ checkExpected(expected);
+ }
+
+ @Test
+ public void testCompactionDoesntFilterVisibilities() throws Exception {
+ // In scan mode, this should return nothing since it produces visibilites
+ // the user can't see. In compaction mode, however, the visibilites
+ // should still show up.
+ setUpTransformIterator(BadVisCompactionKeyTransformingIterator.class);
+
+ TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+ for (int rowID = 1; rowID <= 3; ++rowID) {
+ for (int cfID = 1; cfID <= 3; ++cfID) {
+ for (int cqID = 1; cqID <= 3; ++cqID) {
+ for (int cvID = 1; cvID <= 3; ++cvID) {
+ String row = "row" + rowID;
+ String cf = "cf" + cfID;
+ String cq = "cq" + cqID;
+ String cv = "badvis";
+ long ts = 100*cfID + 10*cqID + cvID;
+ String val = "val" + ts;
+ expected.put(new Key(row, cf, cq, cv, ts), new
Value(val.getBytes()));
+ }
+ }
+ }
+ }
+
+ checkExpected(expected);
+ }
+
+ private Key createDeleteKey(String row, String colFam, String colQual,
String colVis, long timestamp) {
+ Key key = new Key(row, colFam, colQual, colVis, timestamp);
+ key.setDeleted(true);
+ return key;
+ }
+
+ private void checkExpected(TreeMap<Key,Value> expectedEntries) {
+ for (Entry<Key,Value> entry : scanner) {
+ Entry<Key,Value> expected = expectedEntries.pollFirstEntry();
+ Key actualKey = entry.getKey();
+ Value actualValue = entry.getValue();
+
+ assertNotNull("Ran out of expected entries on: " + entry, expected);
+ assertEquals("Key mismatch", expected.getKey(), actualKey);
+ assertEquals("Value mismatch", expected.getValue(), actualValue);
+ }
+
+ assertTrue("Scanner did not return all expected entries: " +
expectedEntries, expectedEntries.isEmpty());
+ }
+
+ private static void putExpected(SortedMap<Key,Value> expected, int rowID,
int cfID, int cqID, int cvID, PartialKey part) {
+ String row = "row" + rowID;
+ String cf = "cf" + cfID;
+ String cq = "cq" + cqID;
+ String cv = "vis" + cvID;
+ long ts = 100*cfID + 10*cqID + cvID;
+ String val = "val" + ts;
+
+ if (part != null) {
+ switch (part) {
+ case ROW:
+ cf = transform(new Text(cf)).toString(); break;
+ case ROW_COLFAM:
+ cq = transform(new Text(cq)).toString(); break;
+ case ROW_COLFAM_COLQUAL:
+ cv = transform(new Text(cv)).toString(); break;
+ default:
+ break;
+ }
+ }
+
+ expected.put(new Key(row, cf, cq, cv, ts), new Value(val.getBytes()));
+ }
+
+ private static Text transform(Text val) {
+ String s = val.toString();
+ // Reverse the order of the number at the end, and subtract one
+ int i = 3 - Integer.parseInt(s.substring(s.length()-1));
+ StringBuilder sb = new StringBuilder();
+ sb.append(s.substring(0, s.length() - 1));
+ sb.append(i);
+ return new Text(sb.toString());
+ }
+
+ private static Mutation createDefaultMutation(String row) {
+ Mutation m = new Mutation(row);
+ for (int cfID = 1; cfID <= 3; ++cfID) {
+ for (int cqID = 1; cqID <= 3; ++cqID) {
+ for (int cvID = 1; cvID <= 3; ++cvID) {
+ String cf = "cf" + cfID;
+ String cq = "cq" + cqID;
+ String cv = "vis" + cvID;
+ long ts = 100*cfID + 10*cqID + cvID;
+ String val = "val" + ts;
+
+ m.put(cf, cq, new ColumnVisibility(cv), ts, val);
+ }
+ }
+ }
+ return m;
+ }
+
+ private static Key reverseKeyPart(Key originalKey, PartialKey part) {
+ Text row = originalKey.getRow();
+ Text cf = originalKey.getColumnFamily();
+ Text cq = originalKey.getColumnQualifier();
+ Text cv = originalKey.getColumnVisibility();
+ long ts = originalKey.getTimestamp();
+ switch (part) {
+ case ROW:
+ cf = transform(cf); break;
+ case ROW_COLFAM:
+ cq = transform(cq); break;
+ case ROW_COLFAM_COLQUAL:
+ cv = transform(cv); break;
+ default:
+ break;
+ }
+ return new Key(row, cf, cq, cv, ts);
+ }
+
+ public static class IdentityKeyTransformingIterator extends
KeyTransformingIterator {
+ @Override
+ protected PartialKey getKeyPrefix() {
+ return PartialKey.ROW;
+ }
+
+ @Override
+ protected Key transformKey(Key originalKey) {
+ return originalKey;
+ };
+ }
+
+ public static abstract class ReversingKeyTransformingIterator extends
KeyTransformingIterator {
+ @Override
+ protected Key transformKey(Key originalKey) {
+ return reverseKeyPart(originalKey, getKeyPrefix());
+ };
+ }
+
+ public static class ColFamReversingKeyTransformingIterator extends
ReversingKeyTransformingIterator {
+ @Override
+ protected PartialKey getKeyPrefix() {
+ return PartialKey.ROW;
+ }
+
+ @Override
+ protected Collection<ByteSequence>
untransformColumnFamilies(Collection<ByteSequence> columnFamilies) {
+ HashSet<ByteSequence> untransformed = new HashSet<ByteSequence>();
+ for (ByteSequence cf : columnFamilies)
+ untransformed.add(untransformColumnFamily(cf));
+ return untransformed;
+ }
+
+ protected ByteSequence untransformColumnFamily(ByteSequence colFam) {
+ Text transformed = transform(new Text(colFam.toArray()));
+ byte[] bytes = transformed.getBytes();
+ return new ArrayByteSequence(bytes, 0, transformed.getLength());
+ }
+ }
+
+ public static class ColFamReversingCompactionKeyTransformingIterator extends
ColFamReversingKeyTransformingIterator {
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
+ env = new MajCIteratorEnvironmentAdapter(env);
+ super.init(source, options, env);
+ }
+ }
+
+ public static class ColQualReversingKeyTransformingIterator extends
ReversingKeyTransformingIterator {
+ @Override
+ protected PartialKey getKeyPrefix() {
+ return PartialKey.ROW_COLFAM;
+ }
+ }
+
+ public static class ColVisReversingKeyTransformingIterator extends
ReversingKeyTransformingIterator {
+ @Override
+ protected PartialKey getKeyPrefix() {
+ return PartialKey.ROW_COLFAM_COLQUAL;
+ }
+ }
+
+ public static class BadVisKeyTransformingIterator extends
KeyTransformingIterator {
+ @Override
+ protected PartialKey getKeyPrefix() {
+ return PartialKey.ROW_COLFAM_COLQUAL;
+ }
+
+ @Override
+ protected Key transformKey(Key originalKey) {
+ return new Key(originalKey.getRow(), originalKey.getColumnFamily(),
originalKey.getColumnQualifier(), new Text("badvis"),
originalKey.getTimestamp());
+ }
+ }
+
+ public static class BadVisCompactionKeyTransformingIterator extends
BadVisKeyTransformingIterator {
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
+ env = new MajCIteratorEnvironmentAdapter(env);
+ super.init(source, options, env);
+ }
+ }
+
+ public static class ReuseIterator extends WrappingIterator {
+ private Key topKey = new Key();
+ private Value topValue = new Value();
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {
+ super.seek(range, columnFamilies, inclusive);
+ loadTop();
+ }
+
+ @Override
+ public void next() throws IOException {
+ super.next();
+ loadTop();
+ }
+
+ @Override
+ public Key getTopKey() {
+ return topKey;
+ }
+
+ @Override
+ public Value getTopValue() {
+ return topValue;
+ }
+
+ private void loadTop() {
+ if (hasTop()) {
+ topKey.set(super.getTopKey());
+ topValue.set(super.getTopValue().get());
+ }
+ }
+ }
+
+ private static class MajCIteratorEnvironmentAdapter implements
IteratorEnvironment {
+ private IteratorEnvironment delegate;
+
+ public MajCIteratorEnvironmentAdapter(IteratorEnvironment delegate) {
+ this.delegate = delegate;
+ }
+
+ public SortedKeyValueIterator<Key,Value> reserveMapFileReader(String
mapFileName) throws IOException {
+ return delegate.reserveMapFileReader(mapFileName);
+ }
+
+ public AccumuloConfiguration getConfig() {
+ return delegate.getConfig();
+ }
+
+ public IteratorScope getIteratorScope() {
+ return IteratorScope.majc;
+ }
+
+ public boolean isFullMajorCompaction() {
+ return delegate.isFullMajorCompaction();
+ }
+
+ public void registerSideChannel(SortedKeyValueIterator<Key,Value> iter) {
+ delegate.registerSideChannel(iter);
+ }
+ }
+}