This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new 71a788e  Added perf test for yielding+scan dispatching (#56)
71a788e is described below

commit 71a788e8c94e0e2e1b7afcfc98b8bb7797d00504
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Feb 8 15:05:06 2019 -0500

    Added perf test for yielding+scan dispatching (#56)
    
    Test running lots of short scans while many filters that return little
    data are running in the background.  If nothing is done these filters
    will prevent the short scans from running.  This test configures
    Accumulo so that the short scans should get a chance to run.  It does
    three things to facilitate the short scans : the filters yield, there
    are two scan executors, and a scan dispatcher sends long running scans
    to the second executor. If yielding and dispatching are working
    correctly then the short scans should have very short response times.
    This happens because the filters should end up in a separate thread pool
    than the short scan.
    
    Also fixed some bugs with the performance test framework.
---
 README.md                                          |   5 +
 bin/performance                                    |  10 +-
 conf/cluster-control.sh.uno                        |   6 +-
 contrib/import-control.xml                         |   4 +
 pom.xml                                            |   2 +-
 .../performance/tests/ProbabilityFilter.java       |  29 +++
 .../performance/tests/TimedScanDispatcher.java     |  29 +++
 .../testing/performance/tests/YieldingFilter.java  |  85 ++++++++
 .../performance/tests/YieldingScanExecutorPT.java  | 227 +++++++++++++++++++++
 9 files changed, 392 insertions(+), 5 deletions(-)

diff --git a/README.md b/README.md
index f9acb26..463c52c 100644
--- a/README.md
+++ b/README.md
@@ -165,6 +165,11 @@ To run performance test a `cluster-control.sh` script is 
needed to assist with s
 wiping, and confguring an Accumulo instance.  This script should define the 
following functions.
 
 ```bash
+
+function get_hadoop_client {
+  # TODO return hadoop client libs in a form suitable for appending to a 
classpath
+}
+
 function get_version {
   case $1 in
     ACCUMULO)
diff --git a/bin/performance b/bin/performance
index d062ecf..20d8e8d 100755
--- a/bin/performance
+++ b/bin/performance
@@ -28,12 +28,13 @@ Possible commands:
   run <output dir> [filter]        Runs performance tests.
   compare <result 1> <result 2>    Compares results of two test.
   csv {files}                      Converts results to CSV
+  list                             List the performance test
 EOF
 }
 
 
 function build_shade_jar() {
-  at_shaded_jar="$at_home/core/target/accumulo-testing-$at_version-shaded.jar"
+  at_shaded_jar="$at_home/target/accumulo-testing-shaded.jar"
   if [ ! -f "$at_shaded_jar" ]; then
     echo "Building $at_shaded_jar"
     cd "$at_home" || exit 1
@@ -58,7 +59,7 @@ fi
 
 . $at_home/conf/cluster-control.sh
 build_shade_jar
-CP="$at_home/core/target/accumulo-testing-$at_version-shaded.jar"
+CP="$at_home/target/accumulo-testing-shaded.jar:$(get_hadoop_client)"
 perf_pkg="org.apache.accumulo.testing.performance.impl"
 case "$1" in
   run)
@@ -77,7 +78,7 @@ case "$1" in
         get_config_file accumulo.properties "$pt_tmp"
         CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  
${perf_pkg}.MergeSiteConfig "$test_class" "$pt_tmp"
         put_config_file "$pt_tmp/accumulo.properties"
-        put_server_code 
"$at_home/core/target/accumulo-testing-core-$at_version.jar"
+        put_server_code "$at_home/target/accumulo-testing-$at_version.jar"
         start_accumulo
         get_config_file accumulo-client.properties "$pt_tmp"
         CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  
${perf_pkg}.PerfTestRunner "$pt_tmp/accumulo-client.properties" "$test_class" 
"$(get_version 'ACCUMULO')" "$2"
@@ -91,6 +92,9 @@ case "$1" in
   csv)
     CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  
${perf_pkg}.Csv "${@:2}"
     ;;
+  list)
+    CLASSPATH="$CP" java -Dlog4j.configuration="file:$log4j_config"  
${perf_pkg}.ListTests
+    ;;
   *)
     echo "Unknown command : $1"
     print_usage
diff --git a/conf/cluster-control.sh.uno b/conf/cluster-control.sh.uno
index 678ee35..a98d520 100644
--- a/conf/cluster-control.sh.uno
+++ b/conf/cluster-control.sh.uno
@@ -22,9 +22,13 @@ function get_ah {
   echo "$($UNO env | grep ACCUMULO_HOME | sed 's/export ACCUMULO_HOME=//' | 
sed 's/"//g')"
 }
 
-
 # functions required for accumulo testing cluster control
 
+function get_hadoop_client {
+  echo "$($UNO env | grep HADOOP_HOME | sed 's/export HADOOP_HOME=//' | sed 
's/"//g')/share/hadoop/client/*"
+}
+
+
 function get_version {
   case $1 in
     ACCUMULO)
diff --git a/contrib/import-control.xml b/contrib/import-control.xml
index ce449fc..af247c4 100644
--- a/contrib/import-control.xml
+++ b/contrib/import-control.xml
@@ -29,6 +29,10 @@
     <allow pkg="org.apache.accumulo.minicluster"/>
     <allow pkg="org.apache.accumulo.hadoop.mapreduce"/>
 
+    <!-- SPI package -->
+    <allow pkg="org.apache.accumulo.core.spi"/>
+
+
     <!-- exceptions for testing -->
     <allow pkg="org.apache.accumulo.core.conf"/>
 
diff --git a/pom.xml b/pom.xml
index a1a07be..b1a606c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
   <description>Testing tools for Apache Accumulo</description>
 
   <properties>
-    <accumulo.version>2.0.0-SNAPSHOT</accumulo.version>
+    <accumulo.version>2.0.0-alpha-2</accumulo.version>
     <hadoop.version>3.0.3</hadoop.version>
     <zookeeper.version>3.4.9</zookeeper.version>
     <slf4j.version>1.7.21</slf4j.version>
diff --git 
a/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
 
b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
new file mode 100644
index 0000000..aa807e0
--- /dev/null
+++ 
b/src/main/java/org/apache/accumulo/testing/performance/tests/ProbabilityFilter.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.BiPredicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+public class ProbabilityFilter extends YieldingFilter {
+
+  private double matchProbability;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    this.matchProbability = Double.parseDouble(options.get("probability"));
+  }
+
+  @Override
+  protected BiPredicate<Key, Value> createPredicate() {
+    Random rand = new Random();
+    return (k,v) -> rand.nextDouble() < matchProbability;
+  }
+}
diff --git 
a/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
 
b/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
new file mode 100644
index 0000000..7058bf9
--- /dev/null
+++ 
b/src/main/java/org/apache/accumulo/testing/performance/tests/TimedScanDispatcher.java
@@ -0,0 +1,29 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import org.apache.accumulo.core.spi.scan.ScanDispatcher;
+import org.apache.accumulo.core.spi.scan.ScanInfo;
+
+public class TimedScanDispatcher implements ScanDispatcher {
+
+  String quickExecutor;
+  long quickTime;
+
+  String longExectuor;
+
+  public void init(InitParameters params) {
+    quickExecutor = params.getOptions().get("quick.executor");
+    quickTime = Long.parseLong(params.getOptions().get("quick.time.ms"));
+
+    longExectuor = params.getOptions().get("long.executor");
+  }
+
+  @Override
+  public String dispatch(DispatchParmaters params) {
+    ScanInfo scanInfo = params.getScanInfo();
+
+    if (scanInfo.getRunTimeStats().sum() < quickTime)
+      return quickExecutor;
+
+    return longExectuor;
+  }
+}
diff --git 
a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
 
b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
new file mode 100644
index 0000000..f514059
--- /dev/null
+++ 
b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingFilter.java
@@ -0,0 +1,85 @@
+package org.apache.accumulo.testing.performance.tests;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.BiPredicate;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.YieldCallback;
+import org.slf4j.LoggerFactory;
+
+public abstract class YieldingFilter implements 
SortedKeyValueIterator<Key,Value> {
+
+  private SortedKeyValueIterator<Key,Value> source;
+  private BiPredicate<Key,Value> predicate;
+  private YieldCallback<Key> yield;
+  private long yieldTime;
+
+  protected abstract BiPredicate<Key,Value> createPredicate();
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    this.source = source;
+    this.predicate = createPredicate();
+    this.yieldTime = Long.parseLong(options.getOrDefault("yieldTimeMS", 
"100"));
+  }
+
+  protected void findTop() throws IOException {
+    long start = System.nanoTime();
+    while (source.hasTop() && !source.getTopKey().isDeleted()
+        && !predicate.test(source.getTopKey(), source.getTopValue())) {
+      long duration = (System.nanoTime() - start) / 1000000;
+      if (duration > yieldTime) {
+        yield.yield(source.getTopKey());
+        break;
+      }
+
+      source.next();
+    }
+  }
+
+  @Override
+  public boolean hasTop() {
+    return !yield.hasYielded() && source.hasTop();
+  }
+
+  @Override
+  public void next() throws IOException {
+    source.next();
+    findTop();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+      throws IOException {
+    source.seek(range, columnFamilies, inclusive);
+    findTop();
+  }
+
+  @Override
+  public Key getTopKey() {
+    return source.getTopKey();
+  }
+
+  @Override
+  public Value getTopValue() {
+    return source.getTopValue();
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void enableYielding(YieldCallback<Key> callback) {
+    this.yield = callback;
+  }
+}
diff --git 
a/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
 
b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
new file mode 100644
index 0000000..dd94d37
--- /dev/null
+++ 
b/src/main/java/org/apache/accumulo/testing/performance/tests/YieldingScanExecutorPT.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.performance.tests;
+
+import java.util.HashMap;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.performance.Environment;
+import org.apache.accumulo.testing.performance.PerformanceTest;
+import org.apache.accumulo.testing.performance.Report;
+import org.apache.accumulo.testing.performance.SystemConfiguration;
+import org.apache.accumulo.testing.performance.util.TestData;
+import org.apache.accumulo.testing.performance.util.TestExecutor;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class YieldingScanExecutorPT implements PerformanceTest {
+
+  private static final int NUM_SHORT_SCANS_THREADS = 5;
+  private static final int NUM_LONG_SCANS = 50;
+
+  private static final int NUM_ROWS = 100000;
+  private static final int NUM_FAMS = 10;
+  private static final int NUM_QUALS = 10;
+
+  private static final String SCAN_EXECUTOR_THREADS = "2";
+
+  private static final String TEST_DESC = "Scan Executor Test.  Test running 
lots of short scans "
+      + "while many filters that return little data are running in the 
background.  If nothing is "
+      + "done these filters will prevent the short scans from running.  This 
test configures "
+      + "Accumulo so that the short scans should get a chance to run.  It does 
three things to "
+      + "facilitate the short scans : the filters yield, there a two scan 
executors, and a scan "
+      + "dispatcher sends long running scans to the second executor. If 
yielding and dispatching "
+      + "are working correctly then the short scans should have very short 
response times.  This "
+      + "happens because the filters should end up in a separate thread pool 
than the short scan.";
+
+  private static final String FILTER_PROBABILITY = "0.000001";
+  private static final String FILTER_YIELD_TIME = "1000";
+
+  private static final String QUICK_SCAN_TIME = "500";
+
+  @Override
+  public SystemConfiguration getSystemConfig() {
+    Map<String,String> siteCfg = new HashMap<>();
+
+    siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
+    siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
+        SCAN_EXECUTOR_THREADS);
+    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
+        SCAN_EXECUTOR_THREADS);
+    return new SystemConfiguration().setAccumuloConfig(siteCfg);
+  }
+
+  @Override
+  public Report runTest(Environment env) throws Exception {
+
+    String tableName = "scept";
+
+    Map<String,String> props = new HashMap<>();
+    // set up a scan dispatcher that send long runnning scans (> 500ms) to the 
second executor
+    props.put(Property.TABLE_SCAN_DISPATCHER.getKey(), 
TimedScanDispatcher.class.getName());
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.executor", 
"se1");
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "quick.time.ms", 
QUICK_SCAN_TIME);
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "long.executor", 
"se2");
+
+    env.getClient().tableOperations().create(tableName,
+        new NewTableConfiguration().setProperties(props));
+
+    long t1 = System.currentTimeMillis();
+    TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, 
NUM_QUALS);
+    long t2 = System.currentTimeMillis();
+    env.getClient().tableOperations().compact(tableName, null, null, true, 
true);
+    long t3 = System.currentTimeMillis();
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+
+    TestExecutor<Long> longScans = startLongScans(env, tableName, stop);
+
+    LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000);
+    LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000);
+
+    stop.set(true);
+    long t4 = System.currentTimeMillis();
+
+    LongSummaryStatistics longStats = longScans.stream().mapToLong(l -> 
l).summaryStatistics();
+
+    longScans.close();
+
+    Report.Builder builder = Report.builder();
+
+    builder.id("yfexec").description(TEST_DESC);
+    builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "Data 
write rate entries/sec ");
+    builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "Compact 
rate entries/sec ");
+    builder.info("short_times1", shortStats1, "Times in ms for each short 
scan.  First run.");
+    builder.info("short_times2", shortStats2, "Times in ms for each short 
scan. Second run.");
+    builder.result("short", shortStats2.getAverage(),
+        "Average times in ms for short scans from 2nd run.");
+    builder.info("long_counts", longStats, "Entries read by each of the filter 
threads");
+    builder.info("long", longStats.getSum(), (t4 - t3),
+        "Combined rate in entries/second of all long scans.  This should be 
low but non-zero.");
+    builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used 
to run short scans.");
+    builder.parameter("long_threads", NUM_LONG_SCANS,
+        "Threads running long fileter scans.  Each thread repeatedly scans 
entire table for "
+        + "duration of test randomly returning a few of the keys.");
+    builder.parameter("rows", NUM_ROWS, "Rows in test table");
+    builder.parameter("familes", NUM_FAMS, "Families per row in test table");
+    builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test 
table");
+    builder.parameter("server_scan_threads", SCAN_EXECUTOR_THREADS,
+        "Server side scan handler threads that each executor has.  There are 2 
executors.");
+
+    builder.parameter("filter_probability", FILTER_PROBABILITY, "The chance 
that one of the long "
+        + "filter scans will return any key it sees.");
+    builder.parameter("filter_yield_time", FILTER_YIELD_TIME, "The time in ms 
after which one of "
+        + "the long filter scans will yield.");
+    builder.parameter("quick_scan_time", QUICK_SCAN_TIME, "The threshold time 
in ms for deciding "
+        + "what is a quick vs long scan.  Times less than this are sent to one 
executor and longer "
+        + "times are sent to another.");
+
+    return builder.build();
+  }
+
+  private static long scan(String tableName, AccumuloClient c, byte[] row, 
byte[] fam,
+      Map<String,String> hints) throws TableNotFoundException {
+    long t1 = System.currentTimeMillis();
+    int count = 0;
+    try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) {
+      // scanner.setExecutionHints(hints);
+      scanner.setRange(Range.exact(new Text(row), new Text(fam)));
+      if (Iterables.size(scanner) != NUM_QUALS) {
+        throw new RuntimeException("bad count " + count);
+      }
+    }
+
+    return System.currentTimeMillis() - t1;
+  }
+
+  private long scan(String tableName, AccumuloClient c, AtomicBoolean stop, 
Map<String,String> hints)
+      throws TableNotFoundException {
+    long count = 0;
+    while (!stop.get()) {
+      try (Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY)) 
{
+
+        IteratorSetting is = new IteratorSetting(30, ProbabilityFilter.class);
+        is.addOption("probability", FILTER_PROBABILITY);
+        is.addOption("yieldTimeMS", FILTER_YIELD_TIME);
+
+        scanner.addScanIterator(is);
+
+        // scanner.setExecutionHints(hints);
+        for (Entry<Key,Value> entry : scanner) {
+          count++;
+          if (stop.get()) {
+            return count;
+          }
+        }
+      }
+    }
+
+    return count;
+  }
+
+  private LongSummaryStatistics runShortScans(Environment env, String 
tableName, int numScans)
+      throws InterruptedException, ExecutionException {
+
+    Map<String,String> execHints = ImmutableMap.of("executor", "se2");
+    Map<String,String> prioHints = ImmutableMap.of("priority", "1");
+
+    try (TestExecutor<Long> executor = new 
TestExecutor<>(NUM_SHORT_SCANS_THREADS)) {
+      Random rand = new Random();
+
+      for (int i = 0; i < numScans; i++) {
+        byte[] row = TestData.row(rand.nextInt(NUM_ROWS));
+        byte[] fam = TestData.fam(rand.nextInt(NUM_FAMS));
+        // scans have a 20% chance of getting dedicated thread pool and 80% 
chance of getting high
+        // priority
+        Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : 
prioHints;
+        executor.submit(() -> scan(tableName, env.getClient(), row, fam, 
hints));
+      }
+
+      return executor.stream().mapToLong(l -> l).summaryStatistics();
+    }
+  }
+
+  private TestExecutor<Long> startLongScans(Environment env, String tableName, 
AtomicBoolean stop) {
+    Map<String,String> hints = ImmutableMap.of("priority", "2");
+
+    TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
+
+    for (int i = 0; i < NUM_LONG_SCANS; i++) {
+      longScans.submit(() -> scan(tableName, env.getClient(), stop, hints));
+    }
+    return longScans;
+  }
+}

Reply via email to