Author: kturner
Date: Thu Jan 17 23:17:20 2013
New Revision: 1434955
URL: http://svn.apache.org/viewvc?rev=1434955&view=rev
Log:
ACCUMULO-956 generlaized transforming iterator, added some sanity checks to it,
added some more unit test, added some static config methods
Added:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
- copied, changed from r1434936,
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
- copied, changed from r1434936,
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
Removed:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
Copied:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
(from r1434936,
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java)
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java?p2=accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java&p1=accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java&r1=1434936&r2=1434955&rev=1434955&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/KeyTransformingIterator.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/iterators/user/TransformingIterator.java
Thu Jan 17 23:17:20 2013
@@ -23,7 +23,10 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
@@ -40,12 +43,13 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.VisibilityParseException;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.Pair;
+import org.apache.commons.collections.BufferOverflowException;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
/**
- * The KeyTransformingIterator allows portions of a key (except for the row)
to be transformed. This iterator handles the details that come with modifying
keys
+ * The TransformingIterator allows portions of a key (except for the row) to
be transformed. This iterator handles the details that come with modifying keys
* (i.e., that the sort order could change). In order to do so, however, the
iterator must put all keys sharing the same prefix in memory. Prefix is defined
as
* the parts of the key that are not modified by this iterator. That is, if
the iterator modifies column qualifier and timestamp, then the prefix is row and
* column family. In that case, the iterator must load all column qualifiers
for each row/column family pair into memory. Given this constraint, care must be
@@ -70,8 +74,11 @@ import org.apache.log4j.Logger;
* major and minor compactions. It should also be noted that this iterator
implements the security filtering rather than relying on a follow-on iterator
to do
* it so that we ensure the test is performed.
*/
-abstract public class KeyTransformingIterator extends WrappingIterator
implements OptionDescriber {
+abstract public class TransformingIterator extends WrappingIterator implements
OptionDescriber {
public static final String AUTH_OPT = "authorizations";
+ public static final String MAX_BUFFER_SIZE_OPT = "maxBufferSize";
+ private static final long DEFAULT_MAX_BUFFER_SIZE = 10000000;
+
protected Logger log = Logger.getLogger(getClass());
protected ArrayList<Pair<Key,Value>> keys = new ArrayList<Pair<Key,Value>>();
@@ -84,6 +91,7 @@ abstract public class KeyTransformingIte
private VisibilityEvaluator ve = null;
private LRUMap visibleCache = null;
private LRUMap parsedVisibilitiesCache = null;
+ private long maxBufferSize;
private static Comparator<Pair<Key,Value>> keyComparator = new
Comparator<Pair<Key,Value>>() {
@Override
@@ -92,7 +100,7 @@ abstract public class KeyTransformingIte
}
};
- public KeyTransformingIterator() {}
+ public TransformingIterator() {}
@Override
public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
@@ -101,11 +109,17 @@ abstract public class KeyTransformingIte
if (scanning) {
String auths = options.get(AUTH_OPT);
if (auths != null && !auths.isEmpty()) {
- ve = new VisibilityEvaluator(new Authorizations(auths.split(",")));
+ ve = new VisibilityEvaluator(new Authorizations(auths.getBytes()));
visibleCache = new LRUMap(100);
}
}
+ if (options.containsKey(MAX_BUFFER_SIZE_OPT)) {
+ maxBufferSize =
AccumuloConfiguration.getMemoryInBytes(options.get(MAX_BUFFER_SIZE_OPT));
+ } else {
+ maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;
+ }
+
parsedVisibilitiesCache = new LRUMap(100);
}
@@ -124,7 +138,7 @@ abstract public class KeyTransformingIte
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
- KeyTransformingIterator copy;
+ TransformingIterator copy;
try {
copy = getClass().newInstance();
@@ -208,7 +222,60 @@ abstract public class KeyTransformingIte
transformKeys();
}
}
-
+
+ private class RangeIterator implements SortedKeyValueIterator<Key,Value> {
+
+ private SortedKeyValueIterator<Key,Value> source;
+ private Key prefixKey;
+ private boolean hasTop = false;
+
+ RangeIterator(SortedKeyValueIterator<Key,Value> source, Key prefixKey) {
+ this.source = source;
+ this.prefixKey = prefixKey;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options, IteratorEnvironment env) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasTop() {
+ // only have a top if the prefix matches
+ return hasTop = source.hasTop() && source.getTopKey().equals(prefixKey,
getKeyPrefix());
+ }
+
+ @Override
+ public void next() throws IOException {
+ // do not let user advance too far and try to avoid reexecuting hasTop()
+ if (!hasTop && !hasTop())
+ throw new NoSuchElementException();
+ hasTop = false;
+ source.next();
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Key getTopKey() {
+ return source.getTopKey();
+ }
+
+ @Override
+ public Value getTopValue() {
+ return source.getTopValue();
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env)
{
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
/**
* Reads all keys matching the first key's prefix from the source iterator,
transforms them, and sorts the resulting keys. Transformed keys that fall
outside
* of our seek range or can't be seen by the user are excluded.
@@ -216,32 +283,37 @@ abstract public class KeyTransformingIte
protected void transformKeys() throws IOException {
keyPos = -1;
keys.clear();
- Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null;
+ final Key prefixKey = super.hasTop() ? new Key(super.getTopKey()) : null;
- while (super.hasTop()) {
- Key sourceTopKey = super.getTopKey();
+ transformRange(new RangeIterator(getSource(), prefixKey), new KVBuffer() {
- // If the source key equals our prefix key (up to the prefix), then
- // we have a key that needs transformed. Otherwise, we're done.
- if (sourceTopKey.equals(prefixKey, getKeyPrefix())) {
- Key transformedKey = transformKey(sourceTopKey);
-
- // If the transformed key didn't actually change, then we need
- // to make a copy since we're caching it in our keys list.
- if (transformedKey == sourceTopKey)
- transformedKey = new Key(sourceTopKey);
- // We could check that the transformed key didn't transform anything
- // in the key prefix here...
+ long appened = 0;
+
+ @Override
+ public void append(Key key, Value val) {
+ // ensure the key provided by the user has the correct prefix
+ if (!key.equals(prefixKey, getKeyPrefix()))
+ throw new IllegalArgumentException("Key prefixes are not equal " +
key + " " + prefixKey);
// Transformation could have produced a key that falls outside
// of the seek range, or one that the user cannot see. Check
// these before adding it to the output list.
- if (includeTransformedKey(transformedKey))
- keys.add(new Pair<Key,Value>(transformedKey, new
Value(super.getTopValue())));
- } else {
- break;
+ if (includeTransformedKey(key)) {
+
+ // try to defend against a scan or compaction using all memory in a
tablet server
+ if (appened > maxBufferSize)
+ throw new BufferOverflowException("Exceeded buffer size of " +
maxBufferSize + ", prefixKey: " + prefixKey);
+
+ if (getSource().hasTop() && key == getSource().getTopKey())
+ key = new Key(key);
+ keys.add(new Pair<Key,Value>(key, new Value(val)));
+ appened += (key.getSize() + val.getSize() + 128);
+ }
}
-
+ });
+
+ // consume any key in range that user did not consume
+ while (super.hasTop() && super.getTopKey().equals(prefixKey,
getKeyPrefix())) {
super.next();
}
@@ -531,19 +603,49 @@ abstract public class KeyTransformingIte
*/
abstract protected PartialKey getKeyPrefix();
+ public static interface KVBuffer {
+ void append(Key key, Value val);
+ }
+
/**
* Transforms {@code originalKey}. This method must not change the row part
of the key, and must only change the parts of the key after the return value of
* {@link #getKeyPrefix()}. Implementors must also remember to copy the
delete flag from {@code originalKey} onto the new key. Or, implementors should
use one
* of the helper methods to produce the new key. See any of the
replaceKeyParts methods.
*
- * @param originalKey
- * the key to be transformed
- * @return the modified key
+ * @param input
+ * An iterator over a group of keys with the same prefix. This
iterator provides an efficient view, bounded by the prefix, of the underlying
iterator
+ * and can not be seeked.
+ * @param output
+ * An output buffer that holds transformed key values. All key
values added to the buffer must have the same prefix as the input keys.
+ * @throws IOException
* @see #replaceColumnFamily(Key, Text)
* @see #replaceColumnQualifier(Key, Text)
* @see #replaceColumnVisibility(Key, Text)
* @see #replaceKeyParts(Key, Text, Text)
* @see #replaceKeyParts(Key, Text, Text, Text)
*/
- abstract protected Key transformKey(Key originalKey);
+
+ abstract protected void transformRange(SortedKeyValueIterator<Key,Value>
input, KVBuffer output) throws IOException;
+
+ /**
+ * Configure authoriations used for post transformation filtering.
+ *
+ * @param config
+ * @param auths
+ */
+ public static void setAutorizations(IteratorSetting config, Authorizations
auths) {
+ config.addOption(AUTH_OPT, auths.serialize());
+ }
+
+ /**
+ * Configure the maximum amount of memory that can be used for
transformation. If this memory is exceeded an exception will be thrown.
+ *
+ * @param config
+ * @param maxBufferSize
+ * size in bytes
+ */
+ public static void setMaxBufferSize(IteratorSetting config, long
maxBufferSize) {
+ config.addOption(MAX_BUFFER_SIZE_OPT, maxBufferSize + "");
+ }
+
}
Copied:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
(from r1434936,
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java)
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java?p2=accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java&p1=accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java&r1=1434936&r2=1434955&rev=1434955&view=diff
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/KeyTransformingIteratorTest.java
(original)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/user/TransformingIteratorTest.java
Thu Jan 17 23:17:20 2013
@@ -58,7 +58,7 @@ import org.apache.hadoop.io.Text;
import org.junit.Before;
import org.junit.Test;
-public class KeyTransformingIteratorTest {
+public class TransformingIteratorTest {
private static final String TABLE_NAME = "test_table";
private static Authorizations authorizations = new Authorizations("vis0",
"vis1", "vis2", "vis3", "vis4");
private Connector connector;
@@ -88,10 +88,10 @@ public class KeyTransformingIteratorTest
scanner.addScanIterator(new IteratorSetting(20, ReuseIterator.class));
}
- private void setUpTransformIterator(Class<? extends KeyTransformingIterator>
clazz) {
+ private void setUpTransformIterator(Class<? extends TransformingIterator>
clazz) {
IteratorSetting cfg = new IteratorSetting(21, clazz);
cfg.setName("keyTransformIter");
- cfg.addOption(KeyTransformingIterator.AUTH_OPT, "vis0, vis1, vis2, vis3");
+ TransformingIterator.setAutorizations(cfg, new Authorizations("vis0",
"vis1", "vis2", "vis3"));
scanner.addScanIterator(cfg);
}
@@ -129,7 +129,7 @@ public class KeyTransformingIteratorTest
setUpTransformIterator(clazz);
// All rows with visibilities reversed
- KeyTransformingIterator iter = clazz.newInstance();
+ TransformingIterator iter = clazz.newInstance();
TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
for (int row = 1; row <= 3; ++row) {
for (int cf = 1; cf <= 3; ++cf) {
@@ -235,7 +235,7 @@ public class KeyTransformingIteratorTest
@Test
public void testReplaceKeyParts() throws Exception {
- KeyTransformingIterator it = new IdentityKeyTransformingIterator();
+ TransformingIterator it = new IdentityKeyTransformingIterator();
Key originalKey = new Key("r", "cf", "cq", "cv", 42);
originalKey.setDeleted(true);
@@ -485,25 +485,32 @@ public class KeyTransformingIteratorTest
return new Key(row, cf, cq, cv, ts);
}
- public static class IdentityKeyTransformingIterator extends
KeyTransformingIterator {
+ public static class IdentityKeyTransformingIterator extends
TransformingIterator {
@Override
protected PartialKey getKeyPrefix() {
return PartialKey.ROW;
}
@Override
- protected Key transformKey(Key originalKey) {
- return originalKey;
- };
+ protected void transformRange(SortedKeyValueIterator<Key,Value> input,
KVBuffer output) throws IOException {
+ while (input.hasTop()) {
+ output.append(input.getTopKey(), input.getTopValue());
+ input.next();
+ }
+ }
}
- public static class DupeTransformingIterator extends KeyTransformingIterator
{
- @Override
- protected Key transformKey(Key originalKey) {
- Key ret = replaceKeyParts(originalKey, new Text("cf1"), new Text("cq1"),
new Text(""));
- ret.setTimestamp(5);
- return ret;
- };
+ public static class DupeTransformingIterator extends TransformingIterator {
+ @Override
+ protected void transformRange(SortedKeyValueIterator<Key,Value> input,
KVBuffer output) throws IOException {
+ while (input.hasTop()) {
+ Key originalKey = input.getTopKey();
+ Key ret = replaceKeyParts(originalKey, new Text("cf1"), new
Text("cq1"), new Text(""));
+ ret.setTimestamp(5);
+ output.append(ret, input.getTopValue());
+ input.next();
+ }
+ }
@Override
protected PartialKey getKeyPrefix() {
@@ -512,11 +519,16 @@ public class KeyTransformingIteratorTest
}
- public static abstract class ReversingKeyTransformingIterator extends
KeyTransformingIterator {
+ public static abstract class ReversingKeyTransformingIterator extends
TransformingIterator {
+
@Override
- protected Key transformKey(Key originalKey) {
- return reverseKeyPart(originalKey, getKeyPrefix());
- };
+ protected void transformRange(SortedKeyValueIterator<Key,Value> input,
KVBuffer output) throws IOException {
+ while (input.hasTop()) {
+ Key originalKey = input.getTopKey();
+ output.append(reverseKeyPart(originalKey, getKeyPrefix()),
input.getTopValue());
+ input.next();
+ }
+ }
}
public static class ColFamReversingKeyTransformingIterator extends
ReversingKeyTransformingIterator {
@@ -562,15 +574,21 @@ public class KeyTransformingIteratorTest
}
}
- public static class IllegalVisKeyTransformingIterator extends
KeyTransformingIterator {
+ public static class IllegalVisKeyTransformingIterator extends
TransformingIterator {
@Override
protected PartialKey getKeyPrefix() {
return PartialKey.ROW_COLFAM_COLQUAL;
}
-
+
@Override
- protected Key transformKey(Key originalKey) {
- return new Key(originalKey.getRow(), originalKey.getColumnFamily(),
originalKey.getColumnQualifier(), new Text("A&|||"),
originalKey.getTimestamp());
+ protected void transformRange(SortedKeyValueIterator<Key,Value> input,
KVBuffer output) throws IOException {
+ while (input.hasTop()) {
+ Key originalKey = input.getTopKey();
+ output.append(
+ new Key(originalKey.getRow(), originalKey.getColumnFamily(),
originalKey.getColumnQualifier(), new Text("A&|||"),
originalKey.getTimestamp()),
+ input.getTopValue());
+ input.next();
+ }
}
}
@@ -582,15 +600,21 @@ public class KeyTransformingIteratorTest
}
}
- public static class BadVisKeyTransformingIterator extends
KeyTransformingIterator {
+ public static class BadVisKeyTransformingIterator extends
TransformingIterator {
@Override
protected PartialKey getKeyPrefix() {
return PartialKey.ROW_COLFAM_COLQUAL;
}
@Override
- protected Key transformKey(Key originalKey) {
- return new Key(originalKey.getRow(), originalKey.getColumnFamily(),
originalKey.getColumnQualifier(), new Text("badvis"),
originalKey.getTimestamp());
+ protected void transformRange(SortedKeyValueIterator<Key,Value> input,
KVBuffer output) throws IOException {
+ while (input.hasTop()) {
+ Key originalKey = input.getTopKey();
+ output.append(
+ new Key(originalKey.getRow(), originalKey.getColumnFamily(),
originalKey.getColumnQualifier(), new Text("badvis"),
originalKey.getTimestamp()),
+ input.getTopValue());
+ input.next();
+ }
}
}