Repository: accumulo
Updated Branches:
  refs/heads/1.6 ff1e003ae -> 21d2f6152


ACCUMULO-4066 Speed up condition checking for conditional mutations


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/21d2f615
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/21d2f615
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/21d2f615

Branch: refs/heads/1.6
Commit: 21d2f61524de1f26ed88c4baaddee70c01081196
Parents: ff1e003
Author: Keith Turner <ktur...@apache.org>
Authored: Wed Jan 27 14:13:40 2016 -0500
Committer: Keith Turner <ktur...@apache.org>
Committed: Wed Jan 27 19:32:29 2016 -0500

----------------------------------------------------------------------
 .../core/client/impl/CompressedIterators.java   |  14 +-
 .../core/client/impl/ConditionalWriterImpl.java |  44 ++++-
 .../accumulo/core/iterators/IteratorUtil.java   |  93 ++++++-----
 .../client/impl/ConditionalComparatorTest.java  |  53 ++++++
 .../tserver/ConditionCheckerContext.java        | 164 +++++++++++++++++++
 .../org/apache/accumulo/tserver/Tablet.java     |  37 ++++-
 .../apache/accumulo/tserver/TabletServer.java   |  95 ++++-------
 .../accumulo/test/ConditionalWriterIT.java      | 137 +++++++++++++++-
 8 files changed, 521 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
index 3fcce90..96d58a7 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/CompressedIterators.java
@@ -24,15 +24,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.data.ArrayByteSequence;
-import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 
 public class CompressedIterators {
   private Map<String,Integer> symbolMap;
   private List<String> symbolTable;
-  private Map<ByteSequence,IterConfig> cache;
 
   public static class IterConfig {
     public List<IterInfo> ssiList = new ArrayList<IterInfo>();
@@ -46,7 +43,6 @@ public class CompressedIterators {
 
   public CompressedIterators(List<String> symbols) {
     this.symbolTable = symbols;
-    this.cache = new HashMap<ByteSequence,IterConfig>();
   }
 
   private int getSymbolID(String symbol) {
@@ -85,14 +81,7 @@ public class CompressedIterators {
   }
 
   public IterConfig decompress(ByteBuffer iterators) {
-
-    ByteSequence iterKey = new ArrayByteSequence(iterators);
-    IterConfig config = cache.get(iterKey);
-    if (config != null) {
-      return config;
-    }
-
-    config = new IterConfig();
+    IterConfig config = new IterConfig();
 
     UnsynchronizedBuffer.Reader in = new 
UnsynchronizedBuffer.Reader(iterators);
 
@@ -120,7 +109,6 @@ public class CompressedIterators {
 
     }
 
-    cache.put(iterKey, config);
     return config;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
index 730cf08..9030d77 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.core.client.impl;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -70,6 +72,7 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.BadArgumentException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -375,7 +378,7 @@ class ConditionalWriterImpl implements ConditionalWriter {
     this.credentials = credentials;
     this.auths = config.getAuthorizations();
     this.ve = new VisibilityEvaluator(config.getAuthorizations());
-    this.threadPool = new 
ScheduledThreadPoolExecutor(config.getMaxWriteThreads());
+    this.threadPool = new 
ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new 
NamingThreadFactory(this.getClass().getSimpleName()));
     this.locator = TabletLocator.getLocator(instance, new Text(tableId));
     this.serverQueues = new HashMap<String,ServerQueue>();
     this.tableId = tableId;
@@ -741,10 +744,47 @@ class ConditionalWriterImpl implements ConditionalWriter {
     }
   }
 
+  static class ConditionComparator implements Comparator<Condition> {
+
+    private static final Long MAX = Long.valueOf(Long.MAX_VALUE);
+
+    @Override
+    public int compare(Condition c1, Condition c2) {
+      int comp = c1.getFamily().compareTo(c2.getFamily());
+      if (comp == 0) {
+        comp = c1.getQualifier().compareTo(c2.getQualifier());
+        if (comp == 0) {
+          comp = c1.getVisibility().compareTo(c2.getVisibility());
+          if (comp == 0) {
+            Long l1 = c1.getTimestamp();
+            Long l2 = c2.getTimestamp();
+            if (l1 == null) {
+              l1 = MAX;
+            }
+
+            if (l2 == null) {
+              l2 = MAX;
+            }
+
+            comp = l2.compareTo(l1);
+          }
+        }
+      }
+
+      return comp;
+    }
+  }
+
+  private static final ConditionComparator CONDITION_COMPARATOR = new 
ConditionComparator();
+
   private List<TCondition> convertConditions(ConditionalMutation cm, 
CompressedIterators compressedIters) {
     List<TCondition> conditions = new 
ArrayList<TCondition>(cm.getConditions().size());
 
-    for (Condition cond : cm.getConditions()) {
+    // sort conditions inorder to get better lookup performance. Sort on 
client side so tserver does not have to do it.
+    Condition[] ca = cm.getConditions().toArray(new 
Condition[cm.getConditions().size()]);
+    Arrays.sort(ca, CONDITION_COMPARATOR);
+
+    for (Condition cond : ca) {
       long ts = 0;
       boolean hasTs = false;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java 
b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
index 98392f6..6f76d77 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -108,21 +109,26 @@ public class IteratorUtil {
     return props;
   }
 
-  public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration 
conf) {
-    List<IterInfo> iters = new ArrayList<IterInfo>();
-    parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), 
conf);
+  public static void mergeIteratorConfig(List<IterInfo> destList, 
Map<String,Map<String,String>> destOpts, List<IterInfo> tableIters,
+      Map<String,Map<String,String>> tableOpts, List<IterInfo> ssi, 
Map<String,Map<String,String>> ssio) {
+    destList.addAll(tableIters);
+    destList.addAll(ssi);
+    Collections.sort(destList, new IterInfoComparator());
 
-    int max = 0;
-
-    for (IterInfo iterInfo : iters) {
-      if (iterInfo.priority > max)
-        max = iterInfo.priority;
+    Set<Entry<String,Map<String,String>>> es = tableOpts.entrySet();
+    for (Entry<String,Map<String,String>> entry : es) {
+      if (entry.getValue() == null) {
+        destOpts.put(entry.getKey(), null);
+      } else {
+        destOpts.put(entry.getKey(), new 
HashMap<String,String>(entry.getValue()));
+      }
     }
 
-    return max;
+    IteratorUtil.mergeOptions(ssio, destOpts);
+
   }
 
-  protected static void parseIterConf(IteratorScope scope, List<IterInfo> 
iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
+  public static void parseIterConf(IteratorScope scope, List<IterInfo> iters, 
Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) {
     final Property scopeProperty = IteratorScope.getProperty(scope);
     final String scopePropertyKey = scopeProperty.getKey();
 
@@ -155,24 +161,6 @@ public class IteratorUtil {
     Collections.sort(iters, new IterInfoComparator());
   }
 
-  public static String findIterator(IteratorScope scope, String className, 
AccumuloConfiguration conf, Map<String,String> opts) {
-    ArrayList<IterInfo> iters = new ArrayList<IterInfo>();
-    Map<String,Map<String,String>> allOptions = new 
HashMap<String,Map<String,String>>();
-
-    parseIterConf(scope, iters, allOptions, conf);
-
-    for (IterInfo iterInfo : iters)
-      if (iterInfo.className.equals(className)) {
-        Map<String,String> tmpOpts = allOptions.get(iterInfo.iterName);
-        if (tmpOpts != null) {
-          opts.putAll(tmpOpts);
-        }
-        return iterInfo.iterName;
-      }
-
-    return null;
-  }
-
   public static <K extends WritableComparable<?>,V extends Writable> 
SortedKeyValueIterator<K,V> loadIterators(IteratorScope scope,
       SortedKeyValueIterator<K,V> source, KeyExtent extent, 
AccumuloConfiguration conf, IteratorEnvironment env) throws IOException {
     List<IterInfo> emptyList = Collections.emptyList();
@@ -209,6 +197,12 @@ public class IteratorUtil {
 
     parseIterConf(scope, iters, allOptions, conf);
 
+    mergeOptions(ssio, allOptions);
+
+    return loadIterators(source, iters, allOptions, env, 
useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH));
+  }
+
+  private static void mergeOptions(Map<String,Map<String,String>> ssio, 
Map<String,Map<String,String>> allOptions) {
     for (Entry<String,Map<String,String>> entry : ssio.entrySet()) {
       if (entry.getValue() == null)
         continue;
@@ -219,30 +213,35 @@ public class IteratorUtil {
         options.putAll(entry.getValue());
       }
     }
-
-    return loadIterators(source, iters, allOptions, env, 
useAccumuloClassLoader, conf.get(Property.TABLE_CLASSPATH));
   }
 
-  @SuppressWarnings("unchecked")
   public static <K extends WritableComparable<?>,V extends Writable> 
SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
       Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, 
IteratorEnvironment env, boolean useAccumuloClassLoader, String context)
       throws IOException {
+    return loadIterators(source, iters, iterOpts, env, useAccumuloClassLoader, 
context, null);
+  }
+
+  public static <K extends WritableComparable<?>,V extends Writable> 
SortedKeyValueIterator<K,V> loadIterators(SortedKeyValueIterator<K,V> source,
+      Collection<IterInfo> iters, Map<String,Map<String,String>> iterOpts, 
IteratorEnvironment env, boolean useAccumuloClassLoader, String context,
+      Map<String,Class<? extends SortedKeyValueIterator<K,V>>> classCache) 
throws IOException {
     // wrap the source in a SynchronizedIterator in case any of the additional 
configured iterators want to use threading
     SortedKeyValueIterator<K,V> prev = new SynchronizedIterator<K,V>(source);
 
     try {
       for (IterInfo iterInfo : iters) {
 
-        Class<? extends SortedKeyValueIterator<K,V>> clazz;
-        if (useAccumuloClassLoader) {
-          if (context != null && !context.equals(""))
-            clazz = (Class<? extends SortedKeyValueIterator<K,V>>) 
AccumuloVFSClassLoader.getContextManager().loadClass(context, 
iterInfo.className,
-                SortedKeyValueIterator.class);
-          else
-            clazz = (Class<? extends SortedKeyValueIterator<K,V>>) 
AccumuloVFSClassLoader.loadClass(iterInfo.className, 
SortedKeyValueIterator.class);
+        Class<? extends SortedKeyValueIterator<K,V>> clazz = null;
+        if (classCache != null) {
+          clazz = classCache.get(iterInfo.className);
+
+          if (clazz == null) {
+            clazz = loadClass(useAccumuloClassLoader, context, iterInfo);
+            classCache.put(iterInfo.className, clazz);
+          }
         } else {
-          clazz = (Class<? extends SortedKeyValueIterator<K,V>>) 
Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
+          clazz = loadClass(useAccumuloClassLoader, context, iterInfo);
         }
+
         SortedKeyValueIterator<K,V> skvi = clazz.newInstance();
 
         Map<String,String> options = iterOpts.get(iterInfo.iterName);
@@ -266,6 +265,22 @@ public class IteratorUtil {
     return prev;
   }
 
+  @SuppressWarnings("unchecked")
+  private static <K extends WritableComparable<?>,V extends Writable> Class<? 
extends SortedKeyValueIterator<K,V>> loadClass(boolean useAccumuloClassLoader,
+      String context, IterInfo iterInfo) throws ClassNotFoundException, 
IOException {
+    Class<? extends SortedKeyValueIterator<K,V>> clazz;
+    if (useAccumuloClassLoader) {
+      if (context != null && !context.equals(""))
+        clazz = (Class<? extends SortedKeyValueIterator<K,V>>) 
AccumuloVFSClassLoader.getContextManager().loadClass(context, 
iterInfo.className,
+            SortedKeyValueIterator.class);
+      else
+        clazz = (Class<? extends SortedKeyValueIterator<K,V>>) 
AccumuloVFSClassLoader.loadClass(iterInfo.className, 
SortedKeyValueIterator.class);
+    } else {
+      clazz = (Class<? extends SortedKeyValueIterator<K,V>>) 
Class.forName(iterInfo.className).asSubclass(SortedKeyValueIterator.class);
+    }
+    return clazz;
+  }
+
   public static Range maximizeStartKeyTimeStamp(Range range) {
     Range seekRange = range;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java
new file mode 100644
index 0000000..5413893
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/impl/ConditionalComparatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.impl;
+
+import 
org.apache.accumulo.core.client.impl.ConditionalWriterImpl.ConditionComparator;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConditionalComparatorTest {
+  @Test
+  public void testComparator() {
+    Condition c1 = new Condition("a", "b");
+    Condition c2 = new Condition("a", "c");
+    Condition c3 = new Condition("b", "c");
+    Condition c4 = new Condition("a", "b").setTimestamp(5);
+    Condition c5 = new Condition("a", "b").setTimestamp(6);
+    Condition c6 = new Condition("a", "b").setVisibility(new 
ColumnVisibility("A&B"));
+    Condition c7 = new Condition("a", "b").setVisibility(new 
ColumnVisibility("A&C"));
+
+    ConditionComparator comparator = new ConditionComparator();
+
+    Assert.assertTrue(comparator.compare(c1, c1) == 0);
+    Assert.assertTrue(comparator.compare(c1, c2) < 0);
+    Assert.assertTrue(comparator.compare(c2, c1) > 0);
+    Assert.assertTrue(comparator.compare(c1, c3) < 0);
+    Assert.assertTrue(comparator.compare(c3, c1) > 0);
+    Assert.assertTrue(comparator.compare(c1, c4) < 0);
+    Assert.assertTrue(comparator.compare(c4, c1) > 0);
+    Assert.assertTrue(comparator.compare(c5, c4) < 0);
+    Assert.assertTrue(comparator.compare(c4, c5) > 0);
+    Assert.assertTrue(comparator.compare(c1, c7) < 0);
+    Assert.assertTrue(comparator.compare(c7, c1) > 0);
+    Assert.assertTrue(comparator.compare(c6, c7) < 0);
+    Assert.assertTrue(comparator.compare(c7, c6) > 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
new file mode 100644
index 0000000..2e34f38
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.impl.CompressedIterators;
+import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.data.thrift.TCMResult;
+import org.apache.accumulo.core.data.thrift.TCMStatus;
+import org.apache.accumulo.core.data.thrift.TCondition;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.tserver.data.ServerConditionalMutation;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+public class ConditionCheckerContext {
+  private CompressedIterators compressedIters;
+
+  private List<IterInfo> tableIters;
+  private Map<String,Map<String,String>> tableIterOpts;
+  private TabletIteratorEnvironment tie;
+  private String context;
+  private Map<String,Class<? extends SortedKeyValueIterator<Key,Value>>> 
classCache;
+
+  private static class MergedIterConfig {
+    List<IterInfo> mergedIters;
+    Map<String,Map<String,String>> mergedItersOpts;
+
+    MergedIterConfig(List<IterInfo> mergedIters, 
Map<String,Map<String,String>> mergedItersOpts) {
+      this.mergedIters = mergedIters;
+      this.mergedItersOpts = mergedItersOpts;
+    }
+  }
+
+  private Map<ByteSequence,MergedIterConfig> mergedIterCache = new 
HashMap<ByteSequence,MergedIterConfig>();
+
+  ConditionCheckerContext(CompressedIterators compressedIters, 
AccumuloConfiguration tableConf) {
+    this.compressedIters = compressedIters;
+
+    tableIters = new ArrayList<IterInfo>();
+    tableIterOpts = new HashMap<String,Map<String,String>>();
+
+    // parse table iterator config once
+    IteratorUtil.parseIterConf(IteratorScope.scan, tableIters, tableIterOpts, 
tableConf);
+
+    context = tableConf.get(Property.TABLE_CLASSPATH);
+
+    classCache = new HashMap<String,Class<? extends 
SortedKeyValueIterator<Key,Value>>>();
+
+    tie = new TabletIteratorEnvironment(IteratorScope.scan, tableConf);
+  }
+
+  SortedKeyValueIterator<Key,Value> 
buildIterator(SortedKeyValueIterator<Key,Value> systemIter, TCondition tc) 
throws IOException {
+
+    ArrayByteSequence key = new ArrayByteSequence(tc.iterators);
+    MergedIterConfig mic = mergedIterCache.get(key);
+    if (mic == null) {
+      IterConfig ic = compressedIters.decompress(tc.iterators);
+
+      List<IterInfo> mergedIters = new ArrayList<IterInfo>(tableIters.size() + 
ic.ssiList.size());
+      Map<String,Map<String,String>> mergedItersOpts = new 
HashMap<String,Map<String,String>>(tableIterOpts.size() + ic.ssio.size());
+
+      IteratorUtil.mergeIteratorConfig(mergedIters, mergedItersOpts, 
tableIters, tableIterOpts, ic.ssiList, ic.ssio);
+
+      mic = new MergedIterConfig(mergedIters, mergedItersOpts);
+
+      mergedIterCache.put(key, mic);
+    }
+
+    return IteratorUtil.loadIterators(systemIter, mic.mergedIters, 
mic.mergedItersOpts, tie, true, context, classCache);
+  }
+
+  boolean checkConditions(SortedKeyValueIterator<Key,Value> systemIter, 
ServerConditionalMutation scm) throws IOException {
+    boolean add = true;
+
+    for (TCondition tc : scm.getConditions()) {
+
+      Range range;
+      if (tc.hasTimestamp)
+        range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new 
Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
+      else
+        range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new 
Text(tc.getCq()), new Text(tc.getCv()));
+
+      SortedKeyValueIterator<Key,Value> iter = buildIterator(systemIter, tc);
+
+      ByteSequence cf = new ArrayByteSequence(tc.getCf());
+      iter.seek(range, Collections.singleton(cf), true);
+      Value val = null;
+      if (iter.hasTop()) {
+        val = iter.getTopValue();
+      }
+
+      if ((val == null ^ tc.getVal() == null) || (val != null && 
!Arrays.equals(tc.getVal(), val.get()))) {
+        add = false;
+        break;
+      }
+    }
+    return add;
+  }
+
+  public class ConditionChecker {
+
+    private List<ServerConditionalMutation> conditionsToCheck;
+    private List<ServerConditionalMutation> okMutations;
+    private List<TCMResult> results;
+    private boolean checked = false;
+
+    public ConditionChecker(List<ServerConditionalMutation> conditionsToCheck, 
List<ServerConditionalMutation> okMutations, List<TCMResult> results) {
+      this.conditionsToCheck = conditionsToCheck;
+      this.okMutations = okMutations;
+      this.results = results;
+    }
+
+    public void check(SortedKeyValueIterator<Key,Value> systemIter) throws 
IOException {
+      Preconditions.checkArgument(!checked, "check() method should only be 
called once");
+      checked = true;
+
+      for (ServerConditionalMutation scm : conditionsToCheck) {
+        if (checkConditions(systemIter, scm)) {
+          okMutations.add(scm);
+        } else {
+          results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
+        }
+      }
+    }
+  }
+
+  public ConditionChecker newChecker(List<ServerConditionalMutation> 
conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> 
results) {
+    return new ConditionChecker(conditionsToCheck, okMutations, results);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 3f00c0b..ca4bde6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -128,6 +128,7 @@ import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.tserver.Compactor.CompactionCanceledException;
 import org.apache.accumulo.tserver.Compactor.CompactionEnv;
+import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
 import org.apache.accumulo.tserver.FileManager.ScanFileManager;
 import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator;
 import org.apache.accumulo.tserver.TabletServer.TservConstraintEnv;
@@ -1597,6 +1598,23 @@ public class Tablet {
     void receive(List<KVEntry> matches) throws IOException;
   }
 
+  public void checkConditions(ConditionChecker checker, Authorizations 
authorizations, AtomicBoolean iFlag) throws IOException {
+
+    ScanDataSource dataSource = new ScanDataSource(this, authorizations, 
this.defaultSecurityLabel, iFlag);
+
+    try {
+      SortedKeyValueIterator<Key,Value> iter = new 
SourceSwitchingIterator(dataSource);
+      checker.check(iter);
+    } catch (IOException ioe) {
+      dataSource.close(true);
+      throw ioe;
+    } finally {
+      // code in finally block because always want
+      // to return mapfiles, even when exception is thrown
+      dataSource.close(false);
+    }
+  }
+
   class LookupResult {
     List<Range> unfinishedRanges = new ArrayList<Range>();
     long bytesAdded = 0;
@@ -1929,18 +1947,29 @@ public class Tablet {
     private StatsIterator statsIterator;
 
     ScanOptions options;
+    private final boolean loadIters;
 
     ScanDataSource(Authorizations authorizations, byte[] defaultLabels, 
HashSet<Column> columnSet, List<IterInfo> ssiList, 
Map<String,Map<String,String>> ssio,
         AtomicBoolean interruptFlag) {
       expectedDeletionCount = dataSourceDeletions.get();
       this.options = new ScanOptions(-1, authorizations, defaultLabels, 
columnSet, ssiList, ssio, interruptFlag, false);
       this.interruptFlag = interruptFlag;
+      this.loadIters = true;
     }
 
     ScanDataSource(ScanOptions options) {
       expectedDeletionCount = dataSourceDeletions.get();
       this.options = options;
       this.interruptFlag = options.interruptFlag;
+      this.loadIters = true;
+    }
+
+    ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] 
defaultLabels, AtomicBoolean iFlag) {
+      expectedDeletionCount = dataSourceDeletions.get();
+      Set<Column> emptycols = Collections.emptySet();
+      this.options = new ScanOptions(-1, authorizations, defaultLabels, 
emptycols, null, null, iFlag, false);
+      this.interruptFlag = iFlag;
+      this.loadIters = false;
     }
 
     @Override
@@ -2035,8 +2064,12 @@ public class Tablet {
 
       VisibilityFilter visFilter = new VisibilityFilter(colFilter, 
options.authorizations, options.defaultLabels);
 
-      return iterEnv.getTopLevelIterator(IteratorUtil
-          .loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, 
options.ssiList, options.ssio, iterEnv));
+      if (!loadIters) {
+        return visFilter;
+      } else {
+        return 
iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, 
visFilter, extent, acuTableConf, options.ssiList, options.ssio,
+            iterEnv));
+      }
     }
 
     private void close(boolean sawErrors) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 4e3e00b..6023ae3 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -67,7 +67,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.CompressedIterators;
-import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.TabletLocator;
@@ -99,7 +98,6 @@ import org.apache.accumulo.core.data.thrift.ScanResult;
 import org.apache.accumulo.core.data.thrift.TCMResult;
 import org.apache.accumulo.core.data.thrift.TCMStatus;
 import org.apache.accumulo.core.data.thrift.TColumn;
-import org.apache.accumulo.core.data.thrift.TCondition;
 import org.apache.accumulo.core.data.thrift.TConditionalMutation;
 import org.apache.accumulo.core.data.thrift.TConditionalSession;
 import org.apache.accumulo.core.data.thrift.TKey;
@@ -199,6 +197,7 @@ import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.accumulo.tserver.Compactor.CompactionInfo;
+import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
 import org.apache.accumulo.tserver.RowLocks.RowLock;
 import org.apache.accumulo.tserver.Tablet.CommitSession;
 import org.apache.accumulo.tserver.Tablet.KVEntry;
@@ -1930,79 +1929,57 @@ public class TabletServer extends AbstractMetricsImpl 
implements org.apache.accu
         List<String> symbols) throws IOException {
       Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = 
updates.entrySet().iterator();
 
-      CompressedIterators compressedIters = new CompressedIterators(symbols);
+      final CompressedIterators compressedIters = new 
CompressedIterators(symbols);
+      ConditionCheckerContext checkerContext = new 
ConditionCheckerContext(compressedIters, 
ServerConfiguration.getTableConfiguration(instance, cs.tableId));
 
       while (iter.hasNext()) {
-        Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
-        Tablet tablet = onlineTablets.get(entry.getKey());
+        final Entry<KeyExtent,List<ServerConditionalMutation>> entry = 
iter.next();
+        final Tablet tablet = onlineTablets.get(entry.getKey());
 
         if (tablet == null || tablet.isClosed()) {
           for (ServerConditionalMutation scm : entry.getValue())
             results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
           iter.remove();
         } else {
-          List<ServerConditionalMutation> okMutations = new 
ArrayList<ServerConditionalMutation>(entry.getValue().size());
+          final List<ServerConditionalMutation> okMutations = new 
ArrayList<ServerConditionalMutation>(entry.getValue().size());
+          final List<TCMResult> resultsSubList = 
results.subList(results.size(), results.size());
 
-          for (ServerConditionalMutation scm : entry.getValue()) {
-            if (checkCondition(results, cs, compressedIters, tablet, scm))
-              okMutations.add(scm);
-          }
-
-          entry.setValue(okMutations);
-        }
-
-      }
-    }
-
-    boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession 
cs, CompressedIterators compressedIters, Tablet tablet,
-        ServerConditionalMutation scm) throws IOException {
-      boolean add = true;
-
-      Set<Column> emptyCols = Collections.emptySet();
-
-      for (TCondition tc : scm.getConditions()) {
-
-        Range range;
-        if (tc.hasTimestamp)
-          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), 
new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs());
-        else
-          range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), 
new Text(tc.getCq()), new Text(tc.getCv()));
-
-        IterConfig ic = compressedIters.decompress(tc.iterators);
-
-        Scanner scanner = tablet.createScanner(range, 1, emptyCols, cs.auths, 
ic.ssiList, ic.ssio, false, cs.interruptFlag);
+          ConditionChecker checker = 
checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
+          try {
+            tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
 
-        try {
-          ScanBatch batch = scanner.read();
+            if (okMutations.size() > 0) {
+              entry.setValue(okMutations);
+            } else {
+              iter.remove();
+            }
+          } catch (TabletClosedException e) {
+            // clear anything added while checking conditions.
+            resultsSubList.clear();
 
-          Value val = null;
+            for (ServerConditionalMutation scm : entry.getValue()) {
+              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+            }
+            iter.remove();
+          } catch (IterationInterruptedException e) {
+            // clear anything added while checking conditions.
+            resultsSubList.clear();
 
-          for (KVEntry entry2 : batch.results) {
-            val = entry2.getValue();
-            break;
-          }
+            for (ServerConditionalMutation scm : entry.getValue()) {
+              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+            }
+            iter.remove();
+          } catch (TooManyFilesException e) {
+            // clear anything added while checking conditions.
+            resultsSubList.clear();
 
-          if ((val == null ^ tc.getVal() == null) || (val != null && 
!Arrays.equals(tc.getVal(), val.get()))) {
-            results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED));
-            add = false;
-            break;
+            for (ServerConditionalMutation scm : entry.getValue()) {
+              results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
+            }
+            iter.remove();
           }
-
-        } catch (TabletClosedException e) {
-          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          add = false;
-          break;
-        } catch (IterationInterruptedException iie) {
-          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          add = false;
-          break;
-        } catch (TooManyFilesException tmfe) {
-          results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
-          add = false;
-          break;
         }
       }
-      return add;
     }
 
     private void 
writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> 
updates, ArrayList<TCMResult> results, ConditionalSession sess) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/21d2f615/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java 
b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 5bfaebf..d32bc43 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@ -19,9 +19,11 @@ package org.apache.accumulo.test;
 
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -63,7 +65,11 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.LongCombiner.Type;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.security.Authorizations;
@@ -90,6 +96,7 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
 
 /**
@@ -501,7 +508,135 @@ public class ConditionalWriterIT extends 
AccumuloClusterIT {
 
     Assert.assertEquals(expected, actual);
 
-    // TODO test w/ table that has iterators configured
+    cw.close();
+  }
+
+  public static class AddingIterator extends WrappingIterator {
+    long amount = 0;
+
+    @Override
+    public Value getTopValue() {
+      Value val = super.getTopValue();
+      long l = Long.parseLong(val.toString());
+      String newVal = (l + amount) + "";
+      return new Value(newVal.getBytes(Charsets.UTF_8));
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+      this.setSource(source);
+      amount = Long.parseLong(options.get("amount"));
+    }
+  }
+
+  public static class MultiplyingIterator extends WrappingIterator {
+    long amount = 0;
+
+    @Override
+    public Value getTopValue() {
+      Value val = super.getTopValue();
+      long l = Long.parseLong(val.toString());
+      String newVal = l * amount + "";
+      return new Value(newVal.getBytes(Charsets.UTF_8));
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+      this.setSource(source);
+      amount = Long.parseLong(options.get("amount"));
+    }
+  }
+
+  @Test
+  public void testTableAndConditionIterators() throws Exception {
+
+    // test w/ table that has iterators configured
+    Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+
+    IteratorSetting aiConfig1 = new IteratorSetting(30, "AI1", 
AddingIterator.class);
+    aiConfig1.addOption("amount", "2");
+    IteratorSetting aiConfig2 = new IteratorSetting(35, "MI1", 
MultiplyingIterator.class);
+    aiConfig2.addOption("amount", "3");
+    IteratorSetting aiConfig3 = new IteratorSetting(40, "AI2", 
AddingIterator.class);
+    aiConfig3.addOption("amount", "5");
+
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new 
BatchWriterConfig());
+
+    Mutation m = new Mutation("ACCUMULO-1000");
+    m.put("count", "comments", "6");
+    bw.addMutation(m);
+
+    m = new Mutation("ACCUMULO-1001");
+    m.put("count", "comments", "7");
+    bw.addMutation(m);
+
+    m = new Mutation("ACCUMULO-1002");
+    m.put("count", "comments", "8");
+    bw.addMutation(m);
+
+    bw.close();
+
+    conn.tableOperations().attachIterator(tableName, aiConfig1, 
EnumSet.of(IteratorScope.scan));
+    conn.tableOperations().offline(tableName, true);
+    conn.tableOperations().online(tableName, true);
+
+    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
+
+    ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setValue("8"));
+    cm6.put("count", "comments", "7");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
+
+    Scanner scanner = conn.createScanner(tableName, new Authorizations());
+    scanner.setRange(new Range("ACCUMULO-1000"));
+    scanner.fetchColumn(new Text("count"), new Text("comments"));
+
+    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
+    Assert.assertEquals("9", entry.getValue().toString());
+
+    ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(aiConfig2).setValue("27"));
+    cm7.put("count", "comments", "8");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
+
+    entry = Iterables.getOnlyElement(scanner);
+    Assert.assertEquals("10", entry.getValue().toString());
+
+    ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(aiConfig2, 
aiConfig3).setValue("35"));
+    cm8.put("count", "comments", "9");
+    Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus());
+
+    entry = Iterables.getOnlyElement(scanner);
+    Assert.assertEquals("11", entry.getValue().toString());
+
+    ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(aiConfig2).setValue("33"));
+    cm3.put("count", "comments", "3");
+
+    ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new 
Condition("count", "comments").setIterators(aiConfig3).setValue("14"));
+    cm4.put("count", "comments", "3");
+
+    ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new 
Condition("count", "comments").setIterators(aiConfig3).setValue("10"));
+    cm5.put("count", "comments", "3");
+
+    Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, 
cm5).iterator());
+    Map<String,Status> actual = new HashMap<String,Status>();
+
+    while (results.hasNext()) {
+      Result result = results.next();
+      String k = new String(result.getMutation().getRow());
+      Assert.assertFalse("Did not expect to see multiple resultus for the row: 
" + k, actual.containsKey(k));
+      actual.put(k, result.getStatus());
+    }
+
+    cw.close();
+
+    Map<String,Status> expected = new HashMap<String,Status>();
+    expected.put("ACCUMULO-1000", Status.ACCEPTED);
+    expected.put("ACCUMULO-1001", Status.ACCEPTED);
+    expected.put("ACCUMULO-1002", Status.REJECTED);
+
+    Assert.assertEquals(expected, actual);
 
     cw.close();
   }

Reply via email to