This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new f18c364  Improvements made for 2.0.0-alpha-2 CI testing (#51)
f18c364 is described below

commit f18c364aa1b58f849f65b4b492dda5226b28278c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Jan 26 17:03:41 2019 -0500

    Improvements made for 2.0.0-alpha-2 CI testing (#51)
    
    * Improvements made for 2.0.0-alpha-2 CI testing
    
    Added ability to set test options on command line. This makes running 
different
    types of ingest from the same docker image easy.  Without this change a new
    docker image would need to be created for each different ingest type.
    
    Documented commands for running a long running ingest test.
    
    Made the ingest flush size configurable.  This made it possible to write 
really
    small amounts of data and then pause.  This changes makes testing
    for the problem in apache/accumulo#854 possible.
    
    Added docs for how to copy docker image to other nodes if you don't have
    a repository handy.
---
 README.md                                          |  16 +-
 bin/agitator                                       |   6 +-
 bin/cingest                                        |   6 +-
 conf/accumulo-testing.properties.example           |   7 +-
 docs/ingest-test.md                                |  85 +++++++++
 .../java/org/apache/accumulo/testing/TestEnv.java  |  38 +++-
 .../org/apache/accumulo/testing/TestProps.java     |   2 +
 .../testing/continuous/ContinuousBatchWalker.java  |  44 ++---
 .../accumulo/testing/continuous/ContinuousEnv.java |   4 +-
 .../testing/continuous/ContinuousIngest.java       | 202 ++++++++++-----------
 .../testing/continuous/ContinuousMoru.java         |  16 +-
 .../testing/continuous/ContinuousScanner.java      |  96 +++++-----
 .../testing/continuous/ContinuousVerify.java       |  20 +-
 .../testing/continuous/ContinuousWalk.java         |  70 ++++---
 .../accumulo/testing/continuous/CreateTable.java   |  69 ++++---
 15 files changed, 389 insertions(+), 292 deletions(-)

diff --git a/README.md b/README.md
index 1a758a1..f9acb26 100644
--- a/README.md
+++ b/README.md
@@ -66,13 +66,18 @@ run in Docker:
 3. Multiple containers can also be run (if you have [Docker Swarm] enabled):
 
    ```bash
+   # the following can be used to get the image on all nodes if you do not 
have a registry.
+   for HOST in node1 node2 node3; do
+     docker save accumulo-testing | ssh -C $HOST docker load &
+   done
+
    docker service create --network="host" --replicas 2 --name ci 
accumulo-testing cingest ingest
    ```
 
 ## Random walk test
 
 The random walk test generates client behavior on an Apache Accumulo instance 
by randomly walking a
-graph of client operations. 
+graph of client operations.
 
 Before running random walk, review the `test.common.*` properties in 
`accumulo-testing.properties`
 file. A test module must also be specified. See the [modules] directory for a 
list of available ones.
@@ -101,12 +106,12 @@ First, run the command below to create an Accumulo table 
for the continuous inge
 table is set by the property `test.ci.common.accumulo.table` (its value 
defaults to `ci`) in the file
 `accumulo-testing.properties`:
 
-          ./bin/cingest createtable
+          ./bin/cingest createtable {-o test.<prop>=<value>}
 
 The continuous ingest tests have several applications that start a local 
application which will run
 continuously until you stop using `ctrl-c`:
 
-          ./bin/cingest <application>
+          ./bin/cingest <application> {-o test.<prop>=<value>}
 
 Below is a list of available continuous ingest applications. You should run 
the `ingest` application
 first to add data to your table.
@@ -131,6 +136,9 @@ table. This MapReduce job will write out an entry for every 
entry in the table (
 created by the MapReduce job itself). Stop ingest before running this 
MapReduce job. Do not run more
 than one instance of this MapReduce job concurrently against a table.
 
+Checkout [ingest-test.md](docs/ingest-test.md) for pointers on running a long
+running ingest and verification test.
+
 ## Agitator
 
 The agitator will periodically kill the Accumulo master, tablet server, and 
Hadoop data node
@@ -208,7 +216,7 @@ function stop_cluster {
 ```
 
 An example script for [Uno] is provided.  To use this do the following and set
-`UNO_HOME` after copying. 
+`UNO_HOME` after copying.
 
     cp conf/cluster-control.sh.uno conf/cluster-control.sh
 
diff --git a/bin/agitator b/bin/agitator
index 8933b7f..bd2b445 100755
--- a/bin/agitator
+++ b/bin/agitator
@@ -29,10 +29,10 @@ Possible commands:
 EOF
 }
 
-if [ -f "$at_home/conf/accumulo-testing-env.sh" ]; then
-  . "$at_home"/conf/accumulo-testing-env.sh
+if [ -f "$at_home/conf/env.sh" ]; then
+  . "$at_home"/conf/env.sh
 else
-  . "$at_home"/conf/accumulo-testing-env.sh.example
+  . "$at_home"/conf/env.sh.example
 fi
 
 function start_agitator() {
diff --git a/bin/cingest b/bin/cingest
index 6cbbb40..39e6d33 100755
--- a/bin/cingest
+++ b/bin/cingest
@@ -21,7 +21,7 @@ at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
 function print_usage() {
   cat <<EOF
 
-Usage: cingest <application>
+Usage: cingest <application> {-o test.<prop>=<value>}
 
 Available applications:
 
@@ -83,12 +83,12 @@ case "$1" in
   verify|moru)
     if [ ! -z $HADOOP_HOME ]; then
       export HADOOP_USE_CLIENT_CLASSLOADER=true
-      "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$ci_main" "$TEST_PROPS" 
"$ACCUMULO_CLIENT_PROPS"
+      "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$ci_main" "${@:2}" 
"$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
     else
       echo "Hadoop must be installed and HADOOP_HOME must be set!"
       exit 1
     fi
     ;;
   *)
-    java -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "$TEST_PROPS" 
"$ACCUMULO_CLIENT_PROPS"
+    java -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" 
"$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
 esac
diff --git a/conf/accumulo-testing.properties.example 
b/conf/accumulo-testing.properties.example
index b084c4c..b64a86e 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -44,6 +44,8 @@ test.ci.common.auths=
 # ------
 # Number of entries each ingest client should write
 test.ci.ingest.client.entries=9223372036854775807
+# Flush batch writer after this many entries.
+test.ci.ingest.entries.flush=1000000
 # Minimum random row to generate
 test.ci.ingest.row.min=0
 # Maximum random row to generate
@@ -57,7 +59,8 @@ test.ci.ingest.max.cq=32767
 test.ci.ingest.visibilities=
 # Checksums will be generated during ingest if set to true
 test.ci.ingest.checksum=true
-# Enables periodic pausing of ingest
+# Enables periodic pausing of ingest. Pause checks are only done after a 
flush. To write small
+# amounts of data and then pause, set pause.wait.max and entries.flush small.
 test.ci.ingest.pause.enabled=false
 # Minimum wait between ingest pauses (in seconds)
 test.ci.ingest.pause.wait.min=120
@@ -90,7 +93,7 @@ test.ci.scanner.entries=5000
 # Verify
 # -----
 # Maximum number of mapreduce mappers
-test.ci.verify.max.maps=64
+test.ci.verify.max.maps=4096
 # Number of mapreduce reducers
 test.ci.verify.reducers=64
 # Perform the verification directly on the files while the table is offline
diff --git a/docs/ingest-test.md b/docs/ingest-test.md
new file mode 100644
index 0000000..fcec446
--- /dev/null
+++ b/docs/ingest-test.md
@@ -0,0 +1,85 @@
+# Running a long continuous ingest test
+
+Running continuous ingest for long periods on a cluster is a good way to
+validate an Accumulo release.  This document outlines one possible way to do
+that.  The commands below show how to start different types of ingest into 
+multiple tables.  These commands assume the docker image was created for 
+accumulo-testing and that docker swarm is available on the cluster.
+
+```bash
+# Number of nodes in cluster
+NUM_NODES=10
+
+# Start lots of processes writing to a single table as fast as possible
+docker run --network="host" accumulo-testing cingest createtable
+docker service create --network="host" --replicas $NUM_NODES --name ci \
+   accumulo-testing cingest ingest \
+   -o test.ci.ingest.pause.enabled=false
+
+# Write data with pauses.  Should cause tablets to not have data in some write
+# ahead logs.  But there is still enough write pressure to cause minor
+# compactions.
+#
+# Some of the write ahead log recovery bugs fixed in 1.9.1 and 1.9.2 were not
+# seen with continuous writes.
+#
+for i in $(seq 1 $NUM_NODES); do
+  TABLE="cip_$i"
+  docker run --network="host" accumulo-testing cingest createtable \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.common.accumulo.num.tablets=$(( $NUM_NODES * 4 ))
+  docker service create --network="host" --replicas 1 --name $TABLE \
+     accumulo-testing cingest ingest \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.ingest.pause.enabled=true
+done
+
+# Write very small amounts of data with long pauses in between.  Should cause
+# data to be spread across lots of write ahead logs.  So little data is written
+# that minor compactions may not happen because of writes.  If Accumulo does
+# nothing then this will result in tablet servers having lots of write ahead
+# logs.
+#
+# https://github.com/apache/accumulo/issues/854
+#
+for FLUSH_SIZE in 97 101 997 1009 ; do
+  TABLE="cip_small_$FLUSH_SIZE"
+  docker run --network="host" accumulo-testing cingest createtable \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.common.accumulo.num.tablets=$(( $NUM_NODES * 2 ))
+  docker service create --network="host" --replicas 1 --name $TABLE \
+     accumulo-testing cingest ingest \
+     -o test.ci.common.accumulo.table=$TABLE \
+     -o test.ci.ingest.pause.enabled=true \
+     -o test.ci.ingest.pause.wait.min=1 \
+     -o test.ci.ingest.pause.wait.max=3 \
+     -o test.ci.ingest.entries.flush=$FLUSH_SIZE
+done
+```
+
+After starting the ingest, consider starting the agitator.  Testing with and
+without agitation is valuable.  Let the ingest run for a period of 12 to 36
+hours. Then stop it as follows.
+
+
+```bash
+# stop the agitator if started
+
+# stop all docker services (assuming its only ingest started above, otherwise 
do not run)
+docker service rm $(docker service ls -q)
+```
+
+After ingest stops verify the data.
+
+```bash
+# run verification map reduce jobs
+mkdir -p logs
+accumulo shell -u root -p secret -e tables | grep ci | while read table ; do
+  nohup ./bin/cingest verify \
+      -o test.ci.common.accumulo.table=$table \
+      -o test.ci.verify.output.dir=/tmp/$table-verify \
+          &> logs/verify_$table.log &
+done
+```
+
+
diff --git a/src/main/java/org/apache/accumulo/testing/TestEnv.java 
b/src/main/java/org/apache/accumulo/testing/TestEnv.java
index 601db3c..d36fa1a 100644
--- a/src/main/java/org/apache/accumulo/testing/TestEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/TestEnv.java
@@ -1,8 +1,10 @@
 package org.apache.accumulo.testing;
 
-import static java.util.Objects.requireNonNull;
-
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -19,12 +21,36 @@ public class TestEnv implements AutoCloseable {
   private AccumuloClient client = null;
   private Configuration hadoopConfig = null;
 
-  public TestEnv(String testPropsPath, String clientPropsPath) {
-    requireNonNull(testPropsPath);
-    requireNonNull(clientPropsPath);
+  public TestEnv(String[] args) {
+
+    Map<String, String> options = new HashMap<>();
+    List<String> arguments = new ArrayList<>();
+
+    for (int i = 0; i < args.length; i++) {
+      if(args[i].equals("-o")) {
+        i++;
+        String[] tokens = args[i].split("=",2);
+        options.put(tokens[0], tokens[1]);
+      } else {
+        arguments.add(args[i]);
+      }
+    }
+
+    if(arguments.size() != 2) {
+      throw new IllegalArgumentException("Expected <testPropsPath> 
<clientPropsPath> arguments.");
+    }
+
+    String testPropsPath = arguments.get(0);
+    clientPropsPath = arguments.get(1);
+
     this.testProps = TestProps.loadFromFile(testPropsPath);
-    this.clientPropsPath = clientPropsPath;
     this.clientProps = 
Accumulo.newClientProperties().from(clientPropsPath).build();
+
+    options.forEach((k,v) -> testProps.setProperty(k, v));
+  }
+
+  public TestEnv(String testPropsPath, String clientPropsPath) {
+    this(new String[] {testPropsPath, clientPropsPath});
   }
 
   private Properties copyProperties(Properties props) {
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java 
b/src/main/java/org/apache/accumulo/testing/TestProps.java
index f8eb640..ad49fd3 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -82,6 +82,8 @@ public class TestProps {
   public static final String CI_INGEST_PAUSE_DURATION_MIN = CI_INGEST + 
"pause.duration.min";
   // Maximum pause duration (in seconds)
   public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + 
"pause.duration.max";
+  // Amount of data to write before flushing. Pause checks are only done after 
flush.
+  public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST + 
"entries.flush";
 
   /** Batch Walker **/
   // Sleep time between batch scans (in ms)
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
index 1716bdf..6c02f7f 100644
--- 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
+++ 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousBatchWalker.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.testing.continuous;
 
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -35,40 +37,34 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
 
-import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
 public class ContinuousBatchWalker {
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousBatchWalker <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
-
-    Authorizations auths = env.getRandomAuthorizations();
-    AccumuloClient client = env.getAccumuloClient();
-    Scanner scanner = ContinuousUtil.createScanner(client, 
env.getAccumuloTableName(), auths);
-    int scanBatchSize = 
Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
-    scanner.setBatchSize(scanBatchSize);
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
+      Authorizations auths = env.getRandomAuthorizations();
+      AccumuloClient client = env.getAccumuloClient();
+      Scanner scanner = ContinuousUtil.createScanner(client, 
env.getAccumuloTableName(), auths);
+      int scanBatchSize = 
Integer.parseInt(env.getTestProperty(TestProps.CI_BW_BATCH_SIZE));
+      scanner.setBatchSize(scanBatchSize);
 
-    Random r = new Random();
+      Random r = new Random();
 
-    while (true) {
-      BatchScanner bs = client.createBatchScanner(env.getAccumuloTableName(), 
auths);
+      while (true) {
+        BatchScanner bs = 
client.createBatchScanner(env.getAccumuloTableName(), auths);
 
-      Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), 
scanBatchSize, r);
-      List<Range> ranges = new ArrayList<>(batch.size());
+        Set<Text> batch = getBatch(scanner, env.getRowMin(), env.getRowMax(), 
scanBatchSize, r);
+        List<Range> ranges = new ArrayList<>(batch.size());
 
-      for (Text row : batch) {
-        ranges.add(new Range(row));
-      }
+        for (Text row : batch) {
+          ranges.add(new Range(row));
+        }
 
-      runBatchScan(scanBatchSize, bs, batch, ranges);
+        runBatchScan(scanBatchSize, bs, batch, ranges);
 
-      int bwSleepMs = 
Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS));
-      sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
+        int bwSleepMs = 
Integer.parseInt(env.getTestProperty(TestProps.CI_BW_SLEEP_MS));
+        sleepUninterruptibly(bwSleepMs, TimeUnit.MILLISECONDS);
+      }
     }
   }
 
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
index 9642cf3..7d9f89c 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
@@ -13,8 +13,8 @@ class ContinuousEnv extends TestEnv {
 
   private List<Authorizations> authList;
 
-  ContinuousEnv(String testPropsPath, String clientPropsPath) {
-    super(testPropsPath, clientPropsPath);
+  ContinuousEnv(String[] args) {
+    super(args);
   }
 
   /**
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index a62ed13..e4a5935 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -84,6 +84,10 @@ public class ContinuousIngest {
     return (rand.nextInt(durationMax - durationMin) + durationMin);
   }
 
+  private static int getFlushEntries(Properties props) {
+    return 
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, 
"1000000"));
+  }
+
   private static void pauseCheck(Properties props, Random rand) throws 
InterruptedException {
     if (pauseEnabled(props)) {
       long elapsedNano = System.nanoTime() - lastPauseNs;
@@ -100,110 +104,88 @@ public class ContinuousIngest {
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousIngest <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+      String vis = env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES);
+      if (vis == null) {
+        visibilities = Collections.singletonList(new ColumnVisibility());
+      } else {
+        visibilities = new ArrayList<>();
+        for (String v : vis.split(",")) {
+          visibilities.add(new ColumnVisibility(v.trim()));
+        }
+      }
 
-    String vis = env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES);
-    if (vis == null) {
-      visibilities = Collections.singletonList(new ColumnVisibility());
-    } else {
-      visibilities = new ArrayList<>();
-      for (String v : vis.split(",")) {
-        visibilities.add(new ColumnVisibility(v.trim()));
+      long rowMin = env.getRowMin();
+      long rowMax = env.getRowMax();
+      if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
+        throw new IllegalArgumentException("bad min and max");
       }
-    }
 
-    long rowMin = env.getRowMin();
-    long rowMax = env.getRowMax();
-    if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
-      throw new IllegalArgumentException("bad min and max");
-    }
+      AccumuloClient client = env.getAccumuloClient();
+      String tableName = env.getAccumuloTableName();
+      if (!client.tableOperations().exists(tableName)) {
+        throw new TableNotFoundException(null, tableName,
+            "Consult the README and create the table before starting ingest.");
+      }
 
-    AccumuloClient client = env.getAccumuloClient();
-    String tableName = env.getAccumuloTableName();
-    if (!client.tableOperations().exists(tableName)) {
-      throw new TableNotFoundException(null, tableName,
-          "Consult the README and create the table before starting ingest.");
-    }
+      BatchWriter bw = client.createBatchWriter(tableName);
+      bw = Trace.wrapAll(bw, TraceSamplers.countSampler(1024));
 
-    BatchWriter bw = client.createBatchWriter(tableName);
-    bw = Trace.wrapAll(bw, TraceSamplers.countSampler(1024));
-
-    Random r = new Random();
-
-    byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
-
-    log.info(String.format("UUID %d %s", System.currentTimeMillis(), new 
String(ingestInstanceId,
-        UTF_8)));
-
-    long count = 0;
-    final int flushInterval = 1000000;
-    final int maxDepth = 25;
-
-    // always want to point back to flushed data. This way the previous item
-    // should
-    // always exist in accumulo when verifying data. To do this make insert
-    // N point
-    // back to the row from insert (N - flushInterval). The array below is
-    // used to keep
-    // track of this.
-    long prevRows[] = new long[flushInterval];
-    long firstRows[] = new long[flushInterval];
-    int firstColFams[] = new int[flushInterval];
-    int firstColQuals[] = new int[flushInterval];
-
-    long lastFlushTime = System.currentTimeMillis();
-
-    int maxColF = env.getMaxColF();
-    int maxColQ = env.getMaxColQ();
-    boolean checksum = 
Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM));
-    long numEntries = 
Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
-
-    Properties testProps = env.getTestProperties();
-    if (pauseEnabled(testProps)) {
-      lastPauseNs = System.nanoTime();
-      pauseWaitSec = getPauseWaitSec(testProps, r);
-      log.info("PAUSING enabled");
-      log.info("INGESTING for " + pauseWaitSec + "s");
-    }
+      Random r = new Random();
+
+      byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
 
-    out: while (true) {
-      // generate first set of nodes
-      ColumnVisibility cv = getVisibility(r);
+      log.info(String.format("UUID %d %s", System.currentTimeMillis(), new 
String(ingestInstanceId,
+          UTF_8)));
 
-      for (int index = 0; index < flushInterval; index++) {
-        long rowLong = genLong(rowMin, rowMax, r);
-        prevRows[index] = rowLong;
-        firstRows[index] = rowLong;
+      long count = 0;
+      final int flushInterval = getFlushEntries(env.getTestProperties());
+      final int maxDepth = 25;
 
-        int cf = r.nextInt(maxColF);
-        int cq = r.nextInt(maxColQ);
+      // always want to point back to flushed data. This way the previous item
+      // should
+      // always exist in accumulo when verifying data. To do this make insert
+      // N point
+      // back to the row from insert (N - flushInterval). The array below is
+      // used to keep
+      // track of this.
+      long prevRows[] = new long[flushInterval];
+      long firstRows[] = new long[flushInterval];
+      int firstColFams[] = new int[flushInterval];
+      int firstColQuals[] = new int[flushInterval];
 
-        firstColFams[index] = cf;
-        firstColQuals[index] = cq;
+      long lastFlushTime = System.currentTimeMillis();
 
-        Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, 
null, checksum);
-        count++;
-        bw.addMutation(m);
+      int maxColF = env.getMaxColF();
+      int maxColQ = env.getMaxColQ();
+      boolean checksum = 
Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM));
+      long numEntries = 
Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+
+      Properties testProps = env.getTestProperties();
+      if (pauseEnabled(testProps)) {
+        lastPauseNs = System.nanoTime();
+        pauseWaitSec = getPauseWaitSec(testProps, r);
+        log.info("PAUSING enabled");
+        log.info("INGESTING for " + pauseWaitSec + "s");
       }
 
-      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-      if (count >= numEntries)
-        break out;
+      out: while (true) {
+        // generate first set of nodes
+        ColumnVisibility cv = getVisibility(r);
 
-      // generate subsequent sets of nodes that link to previous set of
-      // nodes
-      for (int depth = 1; depth < maxDepth; depth++) {
         for (int index = 0; index < flushInterval; index++) {
           long rowLong = genLong(rowMin, rowMax, r);
-          byte[] prevRow = genRow(prevRows[index]);
           prevRows[index] = rowLong;
-          Mutation m = genMutation(rowLong, r.nextInt(maxColF), 
r.nextInt(maxColQ), cv,
-              ingestInstanceId, count, prevRow, checksum);
+          firstRows[index] = rowLong;
+
+          int cf = r.nextInt(maxColF);
+          int cq = r.nextInt(maxColQ);
+
+          firstColFams[index] = cf;
+          firstColQuals[index] = cq;
+
+          Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, 
count, null, checksum);
           count++;
           bw.addMutation(m);
         }
@@ -211,23 +193,41 @@ public class ContinuousIngest {
         lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
         if (count >= numEntries)
           break out;
-        pauseCheck(testProps, r);
-      }
 
-      // create one big linked list, this makes all of the first inserts
-      // point to something
-      for (int index = 0; index < flushInterval - 1; index++) {
-        Mutation m = genMutation(firstRows[index], firstColFams[index], 
firstColQuals[index], cv,
-            ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
-        count++;
-        bw.addMutation(m);
+        // generate subsequent sets of nodes that link to previous set of
+        // nodes
+        for (int depth = 1; depth < maxDepth; depth++) {
+          for (int index = 0; index < flushInterval; index++) {
+            long rowLong = genLong(rowMin, rowMax, r);
+            byte[] prevRow = genRow(prevRows[index]);
+            prevRows[index] = rowLong;
+            Mutation m = genMutation(rowLong, r.nextInt(maxColF), 
r.nextInt(maxColQ), cv,
+                ingestInstanceId, count, prevRow, checksum);
+            count++;
+            bw.addMutation(m);
+          }
+
+          lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+          if (count >= numEntries)
+            break out;
+          pauseCheck(testProps, r);
+        }
+
+        // create one big linked list, this makes all of the first inserts
+        // point to something
+        for (int index = 0; index < flushInterval - 1; index++) {
+          Mutation m = genMutation(firstRows[index], firstColFams[index], 
firstColQuals[index], cv,
+              ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
+          count++;
+          bw.addMutation(m);
+        }
+        lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+        if (count >= numEntries)
+          break out;
+        pauseCheck(testProps, r);
       }
-      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-      if (count >= numEntries)
-        break out;
-      pauseCheck(testProps, r);
+      bw.close();
     }
-    bw.close();
   }
 
   private static long flush(BatchWriter bw, long count, final int 
flushInterval, long lastFlushTime)
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
index 663ed33..c7d102c 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousMoru.java
@@ -23,13 +23,13 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -113,11 +113,7 @@ public class ContinuousMoru extends Configured implements 
Tool {
   @Override
   public int run(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousMoru <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    ContinuousEnv env = new ContinuousEnv(args);
 
     Job job = Job.getInstance(getConf(),
         this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
@@ -151,11 +147,7 @@ public class ContinuousMoru extends Configured implements 
Tool {
   }
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousMoru <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    ContinuousEnv env = new ContinuousEnv(args);
     int res = ToolRunner.run(env.getHadoopConfiguration(), new 
ContinuousMoru(), args);
     if (res != 0)
       System.exit(res);
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
index e4bab76..9fd4a80 100644
--- 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
+++ 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousScanner.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.testing.continuous;
 
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.Iterator;
@@ -32,75 +33,70 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.io.Text;
 
-import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-
 public class ContinuousScanner {
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousScanner <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
 
-    Random r = new Random();
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    long distance = 1000000000000l;
+      Random r = new Random();
 
-    AccumuloClient client = env.getAccumuloClient();
-    Authorizations auths = env.getRandomAuthorizations();
-    Scanner scanner = ContinuousUtil.createScanner(client, 
env.getAccumuloTableName(), auths);
+      long distance = 1000000000000l;
 
-    int numToScan = 
Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
-    int scannerSleepMs = 
Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
+      AccumuloClient client = env.getAccumuloClient();
+      Authorizations auths = env.getRandomAuthorizations();
+      Scanner scanner = ContinuousUtil.createScanner(client, 
env.getAccumuloTableName(), auths);
 
-    double delta = Math.min(.05, .05 / (numToScan / 1000.0));
+      int numToScan = 
Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_ENTRIES));
+      int scannerSleepMs = 
Integer.parseInt(env.getTestProperty(TestProps.CI_SCANNER_SLEEP_MS));
 
-    while (true) {
-      long startRow = ContinuousIngest.genLong(env.getRowMin(), 
env.getRowMax() - distance, r);
-      byte[] scanStart = ContinuousIngest.genRow(startRow);
-      byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
+      double delta = Math.min(.05, .05 / (numToScan / 1000.0));
 
-      scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
+      while (true) {
+        long startRow = ContinuousIngest.genLong(env.getRowMin(), 
env.getRowMax() - distance, r);
+        byte[] scanStart = ContinuousIngest.genRow(startRow);
+        byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
 
-      int count = 0;
-      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+        scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
 
-      long t1 = System.currentTimeMillis();
+        int count = 0;
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
 
-      while (iter.hasNext()) {
-        Entry<Key,Value> entry = iter.next();
-        ContinuousWalk.validate(entry.getKey(), entry.getValue());
-        count++;
-      }
+        long t1 = System.currentTimeMillis();
 
-      long t2 = System.currentTimeMillis();
-
-      // System.out.println("P1 " +count +" "+((1-delta) *
-      // numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
-
-      if (count < (1 - delta) * numToScan || count > (1 + delta) * numToScan) {
-        if (count == 0) {
-          distance = distance * 10;
-          if (distance < 0)
-            distance = 1000000000000l;
-        } else {
-          double ratio = (double) numToScan / count;
-          // move ratio closer to 1 to make change slower
-          ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
-          distance = (long) (ratio * distance);
+        while (iter.hasNext()) {
+          Entry<Key,Value> entry = iter.next();
+          ContinuousWalk.validate(entry.getKey(), entry.getValue());
+          count++;
         }
 
-        // System.out.println("P2 "+delta
-        // +" "+numToScan+" "+distance+"  "+((double)numToScan/count ));
-      }
+        long t2 = System.currentTimeMillis();
+
+        // System.out.println("P1 " +count +" "+((1-delta) *
+        // numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
+
+        if (count < (1 - delta) * numToScan || count > (1 + delta) * 
numToScan) {
+          if (count == 0) {
+            distance = distance * 10;
+            if (distance < 0)
+              distance = 1000000000000l;
+          } else {
+            double ratio = (double) numToScan / count;
+            // move ratio closer to 1 to make change slower
+            ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
+            distance = (long) (ratio * distance);
+          }
+
+          // System.out.println("P2 "+delta
+          // +" "+numToScan+" "+distance+"  "+((double)numToScan/count ));
+        }
 
-      System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), 
(t2 - t1), count);
+        System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, 
UTF_8), (t2 - t1), count);
 
-      if (scannerSleepMs > 0) {
-        sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+        if (scannerSleepMs > 0) {
+          sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+        }
       }
     }
-
   }
 }
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
index e862d66..d94764e 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java
@@ -26,10 +26,10 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -140,21 +140,19 @@ public class ContinuousVerify extends Configured 
implements Tool {
 
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousVerify <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+
+    ContinuousEnv env = new ContinuousEnv(args);
+
+    String tableName = env.getAccumuloTableName();
 
     Job job = Job.getInstance(getConf(),
-        this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+        this.getClass().getSimpleName() + "_"+ tableName + "_" + 
System.currentTimeMillis());
     job.setJarByClass(this.getClass());
 
     job.setInputFormatClass(AccumuloInputFormat.class);
 
     boolean scanOffline = Boolean.parseBoolean(env
         .getTestProperty(TestProps.CI_VERIFY_SCAN_OFFLINE));
-    String tableName = env.getAccumuloTableName();
     int maxMaps = 
Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_MAX_MAPS));
     int reducers = 
Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_REDUCERS));
     String outputDir = env.getTestProperty(TestProps.CI_VERIFY_OUTPUT_DIR);
@@ -202,11 +200,7 @@ public class ContinuousVerify extends Configured 
implements Tool {
   }
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousVerify <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    ContinuousEnv env = new ContinuousEnv(args);
 
     int res = ToolRunner.run(env.getHadoopConfiguration(), new 
ContinuousVerify(), args);
     if (res != 0)
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
index 7f8a024..2094ec9 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java
@@ -45,58 +45,56 @@ public class ContinuousWalk {
   }
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: ContinuousWalk <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
 
-    AccumuloClient client = env.getAccumuloClient();
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
+
+      AccumuloClient client = env.getAccumuloClient();
 
-    Random r = new Random();
+      Random r = new Random();
 
-    ArrayList<Value> values = new ArrayList<>();
+      ArrayList<Value> values = new ArrayList<>();
 
-    int sleepTime = 
Integer.parseInt(env.getTestProperty(TestProps.CI_WALKER_SLEEP_MS));
+      int sleepTime = 
Integer.parseInt(env.getTestProperty(TestProps.CI_WALKER_SLEEP_MS));
 
-    while (true) {
-      Scanner scanner = ContinuousUtil.createScanner(client, 
env.getAccumuloTableName(),
-          env.getRandomAuthorizations());
-      String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, r);
+      while (true) {
+        Scanner scanner = ContinuousUtil.createScanner(client, 
env.getAccumuloTableName(),
+            env.getRandomAuthorizations());
+        String row = findAStartRow(env.getRowMin(), env.getRowMax(), scanner, 
r);
 
-      while (row != null) {
+        while (row != null) {
 
-        values.clear();
+          values.clear();
 
-        long t1 = System.currentTimeMillis();
-        Span span = Trace.on("walk");
-        try {
-          scanner.setRange(new Range(new Text(row)));
-          for (Entry<Key,Value> entry : scanner) {
-            validate(entry.getKey(), entry.getValue());
-            values.add(entry.getValue());
+          long t1 = System.currentTimeMillis();
+          Span span = Trace.on("walk");
+          try {
+            scanner.setRange(new Range(new Text(row)));
+            for (Entry<Key,Value> entry : scanner) {
+              validate(entry.getKey(), entry.getValue());
+              values.add(entry.getValue());
+            }
+          } finally {
+            span.stop();
           }
-        } finally {
-          span.stop();
-        }
-        long t2 = System.currentTimeMillis();
+          long t2 = System.currentTimeMillis();
 
-        System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), 
values.size());
+          System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), 
values.size());
 
-        if (values.size() > 0) {
-          row = getPrevRow(values.get(r.nextInt(values.size())));
-        } else {
-          System.out.printf("MIS %d %s%n", t1, row);
-          System.err.printf("MIS %d %s%n", t1, row);
-          row = null;
+          if (values.size() > 0) {
+            row = getPrevRow(values.get(r.nextInt(values.size())));
+          } else {
+            System.out.printf("MIS %d %s%n", t1, row);
+            System.err.printf("MIS %d %s%n", t1, row);
+            row = null;
+          }
+
+          if (sleepTime > 0)
+            Thread.sleep(sleepTime);
         }
 
         if (sleepTime > 0)
           Thread.sleep(sleepTime);
       }
-
-      if (sleepTime > 0)
-        Thread.sleep(sleepTime);
     }
   }
 
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java 
b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
index fb4eeea..9558ef9 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/CreateTable.java
@@ -27,47 +27,44 @@ public class CreateTable {
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      System.err.println("Usage: CreateTable <testPropsPath> 
<clientPropsPath>");
-      System.exit(-1);
-    }
-    ContinuousEnv env = new ContinuousEnv(args[0], args[1]);
+    try (ContinuousEnv env = new ContinuousEnv(args)) {
 
-    AccumuloClient client = env.getAccumuloClient();
-    String tableName = env.getAccumuloTableName();
-    if (client.tableOperations().exists(tableName)) {
-      System.err.println("ERROR: Accumulo table '" + tableName + "' already 
exists");
-      System.exit(-1);
-    }
+      AccumuloClient client = env.getAccumuloClient();
+      String tableName = env.getAccumuloTableName();
+      if (client.tableOperations().exists(tableName)) {
+        System.err.println("ERROR: Accumulo table '" + tableName + "' already 
exists");
+        System.exit(-1);
+      }
 
-    int numTablets = Integer
-        
.parseInt(env.getTestProperty(TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS));
-    if (numTablets < 1) {
-      System.err.println("ERROR: numTablets < 1");
-      System.exit(-1);
-    }
-    if (env.getRowMin() >= env.getRowMax()) {
-      System.err.println("ERROR: min >= max");
-      System.exit(-1);
-    }
+      int numTablets = Integer.parseInt(env
+          .getTestProperty(TestProps.CI_COMMON_ACCUMULO_NUM_TABLETS));
+      if (numTablets < 1) {
+        System.err.println("ERROR: numTablets < 1");
+        System.exit(-1);
+      }
+      if (env.getRowMin() >= env.getRowMax()) {
+        System.err.println("ERROR: min >= max");
+        System.exit(-1);
+      }
 
-    client.tableOperations().create(tableName);
+      client.tableOperations().create(tableName);
 
-    SortedSet<Text> splits = new TreeSet<>();
-    int numSplits = numTablets - 1;
-    long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) + 1;
-    long split = distance;
-    for (int i = 0; i < numSplits; i++) {
-      String s = String.format("%016x", split + env.getRowMin());
-      while (s.charAt(s.length() - 1) == '0') {
-        s = s.substring(0, s.length() - 1);
+      SortedSet<Text> splits = new TreeSet<>();
+      int numSplits = numTablets - 1;
+      long distance = ((env.getRowMax() - env.getRowMin()) / numTablets) + 1;
+      long split = distance;
+      for (int i = 0; i < numSplits; i++) {
+        String s = String.format("%016x", split + env.getRowMin());
+        while (s.charAt(s.length() - 1) == '0') {
+          s = s.substring(0, s.length() - 1);
+        }
+        splits.add(new Text(s));
+        split += distance;
       }
-      splits.add(new Text(s));
-      split += distance;
-    }
 
-    client.tableOperations().addSplits(tableName, splits);
-    System.out
-        .println("Created Accumulo table '" + tableName + "' with " + 
numTablets + " tablets");
+      client.tableOperations().addSplits(tableName, splits);
+      System.out.println("Created Accumulo table '" + tableName + "' with " + 
numTablets
+          + " tablets");
+    }
   }
 }

Reply via email to