Repository: accumulo Updated Branches: refs/heads/master ce4017884 -> f81a8ec74
ACCUMULO-3079: Collapsed iterator stack by creating new optimized iterators Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b56a3349 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b56a3349 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b56a3349 Branch: refs/heads/master Commit: b56a334932362f1b00f2c6a80f9d7298c204d1d6 Parents: ce40178 Author: Mike Miller <[email protected]> Authored: Fri Mar 31 12:43:45 2017 -0400 Committer: Mike Miller <[email protected]> Committed: Thu Apr 27 15:32:50 2017 -0400 ---------------------------------------------------------------------- .../accumulo/core/iterators/IteratorUtil.java | 3 +- .../accumulo/core/iterators/ServerFilter.java | 78 ++++++++++++++++ .../core/iterators/ServerSkippingIterator.java | 54 +++++++++++ .../core/iterators/ServerWrappingIterator.java | 79 ++++++++++++++++ .../iterators/SynchronizedServerFilter.java | 94 ++++++++++++++++++++ .../system/ColumnFamilySkippingIterator.java | 30 +++---- .../iterators/system/ColumnQualifierFilter.java | 62 ++++++------- .../core/iterators/system/DeletingIterator.java | 28 +++--- .../core/iterators/system/StatsIterator.java | 12 +-- .../iterators/system/SynchronizedIterator.java | 7 +- .../core/iterators/system/VisibilityFilter.java | 40 +++++---- .../core/iterators/user/VisibilityFilter.java | 49 +++++++--- .../problems/ProblemReportingIterator.java | 4 +- .../apache/accumulo/tserver/InMemoryMap.java | 7 -- 14 files changed, 428 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java index 8bd72bf..338da90 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java @@ -44,7 +44,6 @@ import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; import org.apache.accumulo.core.iterators.system.DeletingIterator; -import org.apache.accumulo.core.iterators.system.SynchronizedIterator; import org.apache.accumulo.core.iterators.system.VisibilityFilter; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.security.Authorizations; @@ -255,7 +254,7 @@ public class IteratorUtil { Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, IteratorEnvironment env, boolean useAccumuloClassLoader, String context, Map<String,Class<? extends SortedKeyValueIterator<K,V>>> classCache) throws IOException { // wrap the source in a SynchronizedIterator in case any of the additional configured iterators want to use threading - SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<>(source); + SortedKeyValueIterator<K,V> prev = source; try { for (IterInfo iterInfo : iters) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/ServerFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/ServerFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/ServerFilter.java new file mode 100644 index 0000000..c6db6c4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/ServerFilter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; + +/** + * An optimized version of {@link org.apache.accumulo.core.iterators.Filter}. This class grants protected access to the read only <code>source</code> iterator. + * For performance reasons, the <code>source</code> iterator is declared final and subclasses directly access it, no longer requiring calls to getSource(). The + * {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method is not supported since the source can only be assigned in the constructor. + * + * @since 2.0 + */ +public abstract class ServerFilter extends ServerWrappingIterator { + + public ServerFilter(SortedKeyValueIterator<Key,Value> source) { + super(source); + } + + @Override + public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env); + + @Override + public void next() throws IOException { + source.next(); + findTop(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + source.seek(range, columnFamilies, inclusive); + findTop(); + } + + /** + * Iterates over the source until an acceptable key/value pair is found. + */ + private void findTop() throws IOException { + while (source.hasTop()) { + Key top = source.getTopKey(); + if (top.isDeleted() || (accept(top, source.getTopValue()))) { + break; + } + source.next(); + } + } + + /** + * @return <tt>true</tt> if the key/value pair is accepted by the filter. + */ + public abstract boolean accept(Key k, Value v); + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/ServerSkippingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/ServerSkippingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/ServerSkippingIterator.java new file mode 100644 index 0000000..fb6c59c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/ServerSkippingIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; + +/** + * An optimized version of {@link org.apache.accumulo.core.iterators.SkippingIterator}. This class grants protected access to the read only <code>source</code> + * iterator. For performance reasons, the <code>source</code> iterator is declared final and subclasses directly access it, no longer requiring calls to + * getSource(). + * + * @since 2.0 + */ +public abstract class ServerSkippingIterator extends ServerWrappingIterator { + + public ServerSkippingIterator(SortedKeyValueIterator<Key,Value> source) { + super(source); + } + + @Override + public void next() throws IOException { + source.next(); + consume(); + } + + protected abstract void consume() throws IOException; + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + source.seek(range, columnFamilies, inclusive); + consume(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/ServerWrappingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/ServerWrappingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/ServerWrappingIterator.java new file mode 100644 index 0000000..7f084c9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/ServerWrappingIterator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; + +/** + * An optimized version of {@link org.apache.accumulo.core.iterators.WrappingIterator}. This class grants protected access to the read only <code>source</code> + * iterator. For performance reasons, the <code>source</code> iterator is declared final and subclasses directly access it, no longer requiring calls to + * getSource(). The {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method is not supported since the source can only be assigned in the + * constructor. As with the WrappingIterator, the {@link #deepCopy(IteratorEnvironment)} method is not supported. + * + * @since 2.0 + */ +public abstract class ServerWrappingIterator implements SortedKeyValueIterator<Key,Value> { + + protected final SortedKeyValueIterator<Key,Value> source; + + public ServerWrappingIterator(SortedKeyValueIterator<Key,Value> source) { + this.source = source; + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public Key getTopKey() { + return source.getTopKey(); + } + + @Override + public Value getTopValue() { + return source.getTopValue(); + } + + @Override + public boolean hasTop() { + return source.hasTop(); + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void next() throws IOException { + source.next(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + source.seek(range, columnFamilies, inclusive); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/SynchronizedServerFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/SynchronizedServerFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/SynchronizedServerFilter.java new file mode 100644 index 0000000..b2ad4c2 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iterators/SynchronizedServerFilter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; + +/** + * A SortedKeyValueIterator similar to {@link org.apache.accumulo.core.iterators.ServerFilter} but with the implemented methods marked as synchronized. The + * {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method is also not supported since the source can only be assigned in the constructor. + * + * @since 2.0 + */ +public abstract class SynchronizedServerFilter implements SortedKeyValueIterator<Key,Value> { + + protected final SortedKeyValueIterator<Key,Value> source; + + public SynchronizedServerFilter(SortedKeyValueIterator<Key,Value> source) { + this.source = source; + } + + @Override + public abstract SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env); + + @Override + public synchronized void next() throws IOException { + source.next(); + findTop(); + } + + @Override + public synchronized void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + source.seek(range, columnFamilies, inclusive); + findTop(); + } + + @Override + public synchronized Key getTopKey() { + return source.getTopKey(); + } + + @Override + public synchronized Value getTopValue() { + return source.getTopValue(); + } + + @Override + public synchronized boolean hasTop() { + return source.hasTop(); + } + + /** + * Iterates over the source until an acceptable key/value pair is found. + */ + private void findTop() throws IOException { + while (source.hasTop()) { + Key top = source.getTopKey(); + if (top.isDeleted() || (accept(top, source.getTopValue()))) { + break; + } + source.next(); + } + } + + /** + * @return <tt>true</tt> if the key/value pair is accepted by the filter. + */ + protected abstract boolean accept(Key k, Value v); + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnFamilySkippingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnFamilySkippingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnFamilySkippingIterator.java index 53f3643..8728445 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnFamilySkippingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnFamilySkippingIterator.java @@ -29,10 +29,10 @@ import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SkippingIterator; +import org.apache.accumulo.core.iterators.ServerSkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -public class ColumnFamilySkippingIterator extends SkippingIterator implements InterruptibleIterator { +public class ColumnFamilySkippingIterator extends ServerSkippingIterator implements InterruptibleIterator { protected Set<ByteSequence> colFamSet = null; protected TreeSet<ByteSequence> sortedColFams = null; @@ -41,7 +41,7 @@ public class ColumnFamilySkippingIterator extends SkippingIterator implements In protected Range range; public ColumnFamilySkippingIterator(SortedKeyValueIterator<Key,Value> source) { - this.setSource(source); + super(source); } protected ColumnFamilySkippingIterator(SortedKeyValueIterator<Key,Value> source, Set<ByteSequence> colFamSet, boolean inclusive) { @@ -55,33 +55,33 @@ public class ColumnFamilySkippingIterator extends SkippingIterator implements In int count = 0; if (inclusive) - while (getSource().hasTop() && !colFamSet.contains(getSource().getTopKey().getColumnFamilyData())) { + while (source.hasTop() && !colFamSet.contains(source.getTopKey().getColumnFamilyData())) { if (count < 10) { // it is quicker to call next if we are close, but we never know if we are close // so give next a try a few times - getSource().next(); + source.next(); count++; } else { - ByteSequence higherCF = sortedColFams.higher(getSource().getTopKey().getColumnFamilyData()); + ByteSequence higherCF = sortedColFams.higher(source.getTopKey().getColumnFamilyData()); if (higherCF == null) { // seek to the next row - reseek(getSource().getTopKey().followingKey(PartialKey.ROW)); + reseek(source.getTopKey().followingKey(PartialKey.ROW)); } else { // seek to the next column family in the sorted list of column families - reseek(new Key(getSource().getTopKey().getRowData().toArray(), higherCF.toArray(), new byte[0], new byte[0], Long.MAX_VALUE)); + reseek(new Key(source.getTopKey().getRowData().toArray(), higherCF.toArray(), new byte[0], new byte[0], Long.MAX_VALUE)); } count = 0; } } else if (colFamSet != null && colFamSet.size() > 0) - while (getSource().hasTop() && colFamSet.contains(getSource().getTopKey().getColumnFamilyData())) { + while (source.hasTop() && colFamSet.contains(source.getTopKey().getColumnFamilyData())) { if (count < 10) { - getSource().next(); + source.next(); count++; } else { // seek to the next column family in the data - reseek(getSource().getTopKey().followingKey(PartialKey.ROW_COLFAM)); + reseek(source.getTopKey().followingKey(PartialKey.ROW_COLFAM)); count = 0; } } @@ -90,16 +90,16 @@ public class ColumnFamilySkippingIterator extends SkippingIterator implements In private void reseek(Key key) throws IOException { if (range.afterEndKey(key)) { range = new Range(range.getEndKey(), true, range.getEndKey(), range.isEndKeyInclusive()); - getSource().seek(range, colFamSet, inclusive); + source.seek(range, colFamSet, inclusive); } else { range = new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()); - getSource().seek(range, colFamSet, inclusive); + source.seek(range, colFamSet, inclusive); } } @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - return new ColumnFamilySkippingIterator(getSource().deepCopy(env), colFamSet, inclusive); + return new ColumnFamilySkippingIterator(source.deepCopy(env), colFamSet, inclusive); } @Override @@ -125,7 +125,7 @@ public class ColumnFamilySkippingIterator extends SkippingIterator implements In @Override public void setInterruptFlag(AtomicBoolean flag) { - ((InterruptibleIterator) getSource()).setInterruptFlag(flag); + ((InterruptibleIterator) source).setInterruptFlag(flag); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java index 866f04f..dbd9171 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/ColumnQualifierFilter.java @@ -18,7 +18,6 @@ package org.apache.accumulo.core.iterators.system; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.accumulo.core.data.ArrayByteSequence; @@ -26,25 +25,43 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.ServerFilter; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -public class ColumnQualifierFilter extends Filter { - private boolean scanColumns; +public class ColumnQualifierFilter extends ServerFilter { + private final boolean scanColumns; private HashSet<ByteSequence> columnFamilies; private HashMap<ByteSequence,HashSet<ByteSequence>> columnsQualifiers; - public ColumnQualifierFilter() {} - public ColumnQualifierFilter(SortedKeyValueIterator<Key,Value> iterator, Set<Column> columns) { - setSource(iterator); - init(columns); + super(iterator); + this.columnFamilies = new HashSet<>(); + this.columnsQualifiers = new HashMap<>(); + + for (Column col : columns) { + if (col.columnQualifier != null) { + ArrayByteSequence cq = new ArrayByteSequence(col.columnQualifier); + HashSet<ByteSequence> cfset = this.columnsQualifiers.get(cq); + if (cfset == null) { + cfset = new HashSet<>(); + this.columnsQualifiers.put(cq, cfset); + } + + cfset.add(new ArrayByteSequence(col.columnFamily)); + } else { + // this whole column family should pass + columnFamilies.add(new ArrayByteSequence(col.columnFamily)); + } + } + + // only take action when column qualifies are present + scanColumns = this.columnsQualifiers.size() > 0; } public ColumnQualifierFilter(SortedKeyValueIterator<Key,Value> iterator, HashSet<ByteSequence> columnFamilies, HashMap<ByteSequence,HashSet<ByteSequence>> columnsQualifiers, boolean scanColumns) { - setSource(iterator); + super(iterator); this.columnFamilies = columnFamilies; this.columnsQualifiers = columnsQualifiers; this.scanColumns = scanColumns; @@ -65,33 +82,8 @@ public class ColumnQualifierFilter extends Filter { return cfset != null && cfset.contains(key.getColumnFamilyData()); } - public void init(Set<Column> columns) { - this.columnFamilies = new HashSet<>(); - this.columnsQualifiers = new HashMap<>(); - - for (Iterator<Column> iter = columns.iterator(); iter.hasNext();) { - Column col = iter.next(); - if (col.columnQualifier != null) { - ArrayByteSequence cq = new ArrayByteSequence(col.columnQualifier); - HashSet<ByteSequence> cfset = this.columnsQualifiers.get(cq); - if (cfset == null) { - cfset = new HashSet<>(); - this.columnsQualifiers.put(cq, cfset); - } - - cfset.add(new ArrayByteSequence(col.columnFamily)); - } else { - // this whole column family should pass - columnFamilies.add(new ArrayByteSequence(col.columnFamily)); - } - } - - // only take action when column qualifies are present - scanColumns = this.columnsQualifiers.size() > 0; - } - @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - return new ColumnQualifierFilter(getSource().deepCopy(env), columnFamilies, columnsQualifiers, scanColumns); + return new ColumnQualifierFilter(source.deepCopy(env), columnFamilies, columnsQualifiers, scanColumns); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/system/DeletingIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/DeletingIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/DeletingIterator.java index abdb6c1..e33ee3a 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/DeletingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/DeletingIterator.java @@ -27,10 +27,10 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.ServerWrappingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; -public class DeletingIterator extends WrappingIterator { +public class DeletingIterator extends ServerWrappingIterator { private boolean propogateDeletes; private Key workKey = new Key(); @@ -40,23 +40,21 @@ public class DeletingIterator extends WrappingIterator { } public DeletingIterator(DeletingIterator other, IteratorEnvironment env) { - setSource(other.getSource().deepCopy(env)); + super(other.source.deepCopy(env)); propogateDeletes = other.propogateDeletes; } - public DeletingIterator() {} - public DeletingIterator(SortedKeyValueIterator<Key,Value> iterator, boolean propogateDeletes) throws IOException { - this.setSource(iterator); + super(iterator); this.propogateDeletes = propogateDeletes; } @Override public void next() throws IOException { - if (super.getTopKey().isDeleted()) + if (source.getTopKey().isDeleted()) skipRowColumn(); else - getSource().next(); + source.next(); findTop(); } @@ -65,11 +63,11 @@ public class DeletingIterator extends WrappingIterator { // do not want to seek to the middle of a row Range seekRange = IteratorUtil.maximizeStartKeyTimeStamp(range); - super.seek(seekRange, columnFamilies, inclusive); + source.seek(seekRange, columnFamilies, inclusive); findTop(); if (range.getStartKey() != null) { - while (getSource().hasTop() && getSource().getTopKey().compareTo(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME) < 0) { + while (source.hasTop() && source.getTopKey().compareTo(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME) < 0) { next(); } @@ -81,20 +79,20 @@ public class DeletingIterator extends WrappingIterator { private void findTop() throws IOException { if (!propogateDeletes) { - while (getSource().hasTop() && getSource().getTopKey().isDeleted()) { + while (source.hasTop() && source.getTopKey().isDeleted()) { skipRowColumn(); } } } private void skipRowColumn() throws IOException { - workKey.set(getSource().getTopKey()); + workKey.set(source.getTopKey()); Key keyToSkip = workKey; - getSource().next(); + source.next(); - while (getSource().hasTop() && getSource().getTopKey().equals(keyToSkip, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { - getSource().next(); + while (source.hasTop() && source.getTopKey().equals(keyToSkip, PartialKey.ROW_COLFAM_COLQUAL_COLVIS)) { + source.next(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/system/StatsIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/StatsIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/StatsIterator.java index f92d1ec..160a29f 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/StatsIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/StatsIterator.java @@ -25,27 +25,27 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.ServerWrappingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.WrappingIterator; /** * */ -public class StatsIterator extends WrappingIterator { +public class StatsIterator extends ServerWrappingIterator { private int numRead = 0; private AtomicLong seekCounter; private AtomicLong readCounter; public StatsIterator(SortedKeyValueIterator<Key,Value> source, AtomicLong seekCounter, AtomicLong readCounter) { - super.setSource(source); + super(source); this.seekCounter = seekCounter; this.readCounter = readCounter; } @Override public void next() throws IOException { - super.next(); + source.next(); numRead++; if (numRead % 23 == 0) { @@ -56,12 +56,12 @@ public class StatsIterator extends WrappingIterator { @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - return new StatsIterator(getSource().deepCopy(env), seekCounter, readCounter); + return new StatsIterator(source.deepCopy(env), seekCounter, readCounter); } @Override public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - super.seek(range, columnFamilies, inclusive); + source.seek(range, columnFamilies, inclusive); seekCounter.incrementAndGet(); readCounter.addAndGet(numRead); numRead = 0; http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java index 43da54d..fab0439 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/SynchronizedIterator.java @@ -33,12 +33,11 @@ import org.apache.hadoop.io.WritableComparable; */ public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writable> implements SortedKeyValueIterator<K,V> { - private SortedKeyValueIterator<K,V> source = null; + private final SortedKeyValueIterator<K,V> source; @Override public synchronized void init(SortedKeyValueIterator<K,V> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - this.source = source; - source.init(source, options, env); + throw new UnsupportedOperationException(); } @Override @@ -71,8 +70,6 @@ public class SynchronizedIterator<K extends WritableComparable<?>,V extends Writ return new SynchronizedIterator<>(source.deepCopy(env)); } - public SynchronizedIterator() {} - public SynchronizedIterator(SortedKeyValueIterator<K,V> source) { this.source = source; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java index a204ad1..b260bc4 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/system/VisibilityFilter.java @@ -16,54 +16,56 @@ */ package org.apache.accumulo.core.iterators.system; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.SynchronizedServerFilter; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.VisibilityEvaluator; import org.apache.accumulo.core.security.VisibilityParseException; import org.apache.accumulo.core.util.BadArgumentException; -import org.apache.accumulo.core.util.TextUtil; import org.apache.commons.collections.map.LRUMap; -import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class VisibilityFilter extends Filter { +/** + * A SortedKeyValueIterator that filters based on ColumnVisibility and optimized for use with system iterators. Prior to 2.0, this class extended + * {@link org.apache.accumulo.core.iterators.Filter} and all system iterators where wrapped with a <code>SynchronizedIterator</code> during creation of the + * iterator stack in {@link org.apache.accumulo.core.iterators.IteratorUtil} .loadIterators(). For performance reasons, the synchronization was pushed down the + * stack to this class. + */ +public class VisibilityFilter extends SynchronizedServerFilter { protected VisibilityEvaluator ve; - protected Text defaultVisibility; + protected ByteSequence defaultVisibility; protected LRUMap cache; - protected Text tmpVis; protected Authorizations authorizations; private static final Logger log = LoggerFactory.getLogger(VisibilityFilter.class); - public VisibilityFilter() {} - public VisibilityFilter(SortedKeyValueIterator<Key,Value> iterator, Authorizations authorizations, byte[] defaultVisibility) { - setSource(iterator); + super(iterator); this.ve = new VisibilityEvaluator(authorizations); this.authorizations = authorizations; - this.defaultVisibility = new Text(defaultVisibility); + this.defaultVisibility = new ArrayByteSequence(defaultVisibility); this.cache = new LRUMap(1000); - this.tmpVis = new Text(); } @Override - public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - return new VisibilityFilter(getSource().deepCopy(env), authorizations, TextUtil.getBytes(defaultVisibility)); + public synchronized SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + return new VisibilityFilter(source.deepCopy(env), authorizations, defaultVisibility.toArray()); } @Override - public boolean accept(Key k, Value v) { - Text testVis = k.getColumnVisibility(tmpVis); + protected boolean accept(Key k, Value v) { + ByteSequence testVis = k.getColumnVisibilityData(); - if (testVis.getLength() == 0 && defaultVisibility.getLength() == 0) + if (testVis.length() == 0 && defaultVisibility.length() == 0) return true; - else if (testVis.getLength() == 0) + else if (testVis.length() == 0) testVis = defaultVisibility; Boolean b = (Boolean) cache.get(testVis); @@ -71,8 +73,8 @@ public class VisibilityFilter extends Filter { return b; try { - Boolean bb = ve.evaluate(new ColumnVisibility(testVis)); - cache.put(new Text(testVis), bb); + Boolean bb = ve.evaluate(new ColumnVisibility(testVis.toArray())); + cache.put(testVis, bb); return bb; } catch (VisibilityParseException e) { log.error("Parse Error", e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/core/src/main/java/org/apache/accumulo/core/iterators/user/VisibilityFilter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/VisibilityFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/VisibilityFilter.java index f7007a1..e0c0b0f 100644 --- a/core/src/main/java/org/apache/accumulo/core/iterators/user/VisibilityFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/VisibilityFilter.java @@ -22,21 +22,30 @@ import java.io.IOException; import java.util.Map; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.OptionDescriber; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.VisibilityEvaluator; +import org.apache.accumulo.core.security.VisibilityParseException; import org.apache.accumulo.core.util.BadArgumentException; import org.apache.commons.collections.map.LRUMap; -import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; /** - * + * A SortedKeyValueIterator that filters based on ColumnVisibility. */ -public class VisibilityFilter extends org.apache.accumulo.core.iterators.system.VisibilityFilter { +public class VisibilityFilter extends Filter implements OptionDescriber { + + protected VisibilityEvaluator ve; + protected LRUMap cache; + + private static final Logger log = Logger.getLogger(VisibilityFilter.class); private static final String AUTHS = "auths"; private static final String FILTER_INVALID_ONLY = "filterInvalid"; @@ -46,9 +55,7 @@ public class VisibilityFilter extends org.apache.accumulo.core.iterators.system. /** * */ - public VisibilityFilter() { - super(); - } + public VisibilityFilter() {} @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { @@ -60,29 +67,45 @@ public class VisibilityFilter extends org.apache.accumulo.core.iterators.system. String auths = options.get(AUTHS); Authorizations authObj = auths == null || auths.isEmpty() ? new Authorizations() : new Authorizations(auths.getBytes(UTF_8)); this.ve = new VisibilityEvaluator(authObj); - this.defaultVisibility = new Text(); } this.cache = new LRUMap(1000); - this.tmpVis = new Text(); } @Override public boolean accept(Key k, Value v) { + ByteSequence testVis = k.getColumnVisibilityData(); if (filterInvalid) { - Text testVis = k.getColumnVisibility(tmpVis); Boolean b = (Boolean) cache.get(testVis); if (b != null) return b; try { - new ColumnVisibility(testVis); - cache.put(new Text(testVis), true); + new ColumnVisibility(testVis.toArray()); + cache.put(testVis, true); return true; } catch (BadArgumentException e) { - cache.put(new Text(testVis), false); + cache.put(testVis, false); return false; } } else { - return super.accept(k, v); + if (testVis.length() == 0) { + return true; + } + + Boolean b = (Boolean) cache.get(testVis); + if (b != null) + return b; + + try { + Boolean bb = ve.evaluate(new ColumnVisibility(testVis.toArray())); + cache.put(testVis, bb); + return bb; + } catch (VisibilityParseException e) { + log.error("Parse Error", e); + return false; + } catch (BadArgumentException e) { + log.error("Parse Error", e); + return false; + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java index 83b4615..7e1676a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReportingIterator.java @@ -31,9 +31,9 @@ import org.apache.accumulo.core.iterators.system.InterruptibleIterator; import org.apache.accumulo.server.AccumuloServerContext; public class ProblemReportingIterator implements InterruptibleIterator { - private SortedKeyValueIterator<Key,Value> source; + private final SortedKeyValueIterator<Key,Value> source; private boolean sawError = false; - private boolean continueOnError; + private final boolean continueOnError; private String resource; private String tableId; private final AccumuloServerContext context; http://git-wip-us.apache.org/repos/asf/accumulo/blob/b56a3349/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 9d5e0d0..29d5ca0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -683,13 +683,6 @@ public class InMemoryMap { private SourceSwitchingIterator ssi; private MemoryDataSource mds; - @Override - protected SortedKeyValueIterator<Key,Value> getSource() { - if (closed.get()) - throw new IllegalStateException("Memory iterator is closed"); - return super.getSource(); - } - private MemoryIterator(InterruptibleIterator source) { this(source, new AtomicBoolean(false)); }
