This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/1.9 by this push:
new 288a34e Added random pausing to continuous ingest (#15)
288a34e is described below
commit 288a34ea653b22f256267512c2fbd3b4d0edf473
Author: Mike Walch <[email protected]>
AuthorDate: Thu Jul 5 11:09:26 2018 -0400
Added random pausing to continuous ingest (#15)
---
conf/accumulo-testing.properties.example | 10 ++++
.../apache/accumulo/testing/core/TestProps.java | 10 ++++
.../testing/core/continuous/ContinuousIngest.java | 61 ++++++++++++++++++++--
3 files changed, 78 insertions(+), 3 deletions(-)
diff --git a/conf/accumulo-testing.properties.example
b/conf/accumulo-testing.properties.example
index 7468443..4fff104 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -77,6 +77,16 @@ test.ci.ingest.max.cq=32767
test.ci.ingest.visibilities=
# Checksums will be generated during ingest if set to true
test.ci.ingest.checksum=true
+# Enables periodic pausing of ingest
+test.ci.ingest.pause.enabled=false
+# Minimum wait between ingest pauses (in seconds)
+test.ci.ingest.pause.wait.min=120
+# Maximum wait between ingest pauses (in seconds)
+test.ci.ingest.pause.wait.max=180
+# Minimum pause duration (in seconds)
+test.ci.ingest.pause.duration.min=60
+# Maximum pause duration (in seconds)
+test.ci.ingest.pause.duration.max=120
# Batch walker
# ------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
index b0927db..638d2db 100644
--- a/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
+++ b/core/src/main/java/org/apache/accumulo/testing/core/TestProps.java
@@ -92,6 +92,16 @@ public class TestProps {
public static final String CI_INGEST_VISIBILITIES = CI_INGEST +
"visibilities";
// Checksums will be generated during ingest if set to true
public static final String CI_INGEST_CHECKSUM = CI_INGEST + "checksum";
+ // Enables periodic pausing of ingest
+ public static final String CI_INGEST_PAUSE_ENABLED = CI_INGEST +
"pause.enabled";
+ // Minimum wait between ingest pauses (in seconds)
+ public static final String CI_INGEST_PAUSE_WAIT_MIN = CI_INGEST +
"pause.wait.min";
+ // Maximum wait between ingest pauses (in seconds)
+ public static final String CI_INGEST_PAUSE_WAIT_MAX = CI_INGEST +
"pause.wait.max";
+ // Minimum pause duration (in seconds)
+ public static final String CI_INGEST_PAUSE_DURATION_MIN = CI_INGEST +
"pause.duration.min";
+ // Maximum pause duration (in seconds)
+ public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST +
"pause.duration.max";
/** Batch Walker **/
// Sleep time between batch scans (in ms)
diff --git
a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
index db281a7..4afd00c 100644
---
a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
+++
b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
@@ -24,9 +24,11 @@ import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -39,17 +41,62 @@ import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.testing.core.TestProps;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ContinuousIngest {
+ private static final Logger log =
LoggerFactory.getLogger(ContinuousIngest.class);
+
private static final byte[] EMPTY_BYTES = new byte[0];
private static List<ColumnVisibility> visibilities;
+ private static long lastPauseNs;
+ private static long pauseWaitSec;
private static ColumnVisibility getVisibility(Random rand) {
return visibilities.get(rand.nextInt(visibilities.size()));
}
+ private static boolean pauseEnabled(Properties props) {
+ String value = props.getProperty(TestProps.CI_INGEST_PAUSE_ENABLED);
+ return Boolean.parseBoolean(value);
+ }
+
+ private static int getPauseWaitSec(Properties props, Random rand) {
+ int waitMin =
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
+ int waitMax =
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
+ Preconditions.checkState(waitMax >= waitMin && waitMin > 0);
+ if (waitMax == waitMin) {
+ return waitMin;
+ }
+ return (rand.nextInt(waitMax - waitMin) + waitMin);
+ }
+
+ private static int getPauseDurationSec(Properties props, Random rand) {
+ int durationMin =
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MIN));
+ int durationMax =
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_PAUSE_DURATION_MAX));
+ Preconditions.checkState(durationMax >= durationMin && durationMin > 0);
+ if (durationMax == durationMin) {
+ return durationMin;
+ }
+ return (rand.nextInt(durationMax - durationMin) + durationMin);
+ }
+
+ private static void pauseCheck(Properties props, Random rand) throws
InterruptedException {
+ if (pauseEnabled(props)) {
+ long elapsedNano = System.nanoTime() - lastPauseNs;
+ if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) {
+ long pauseDurationSec = getPauseDurationSec(props, rand);
+ log.info("PAUSING for " + pauseDurationSec + "s");
+ Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec));
+ lastPauseNs = System.nanoTime();
+ pauseWaitSec = getPauseWaitSec(props, rand);
+ log.info("INGESTING for " + pauseWaitSec + "s");
+ }
+ }
+ }
+
public static void main(String[] args) throws Exception {
if (args.length != 1) {
@@ -90,7 +137,7 @@ public class ContinuousIngest {
byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
- System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new
String(ingestInstanceId, UTF_8));
+ log.info(String.format("UUID %d %s", System.currentTimeMillis(), new
String(ingestInstanceId, UTF_8)));
long count = 0;
final int flushInterval = 1000000;
@@ -115,6 +162,13 @@ public class ContinuousIngest {
boolean checksum =
Boolean.parseBoolean(props.getProperty(TestProps.CI_INGEST_CHECKSUM));
long numEntries =
Long.parseLong(props.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+ if (pauseEnabled(props)) {
+ lastPauseNs = System.nanoTime();
+ pauseWaitSec = getPauseWaitSec(props, r);
+ log.info("PAUSING enabled");
+ log.info("INGESTING for " + pauseWaitSec + "s");
+ }
+
out: while (true) {
// generate first set of nodes
ColumnVisibility cv = getVisibility(r);
@@ -154,6 +208,7 @@ public class ContinuousIngest {
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
if (count >= numEntries)
break out;
+ pauseCheck(props, r);
}
// create one big linked list, this makes all of the first inserts
@@ -167,8 +222,8 @@ public class ContinuousIngest {
lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
if (count >= numEntries)
break out;
+ pauseCheck(props, r);
}
-
bw.close();
}
@@ -176,7 +231,7 @@ public class ContinuousIngest {
long t1 = System.currentTimeMillis();
bw.flush();
long t2 = System.currentTimeMillis();
- System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2
- t1), count, flushInterval);
+ log.info(String.format("FLUSH %d %d %d %d %d", t2, (t2 - lastFlushTime),
(t2 - t1), count, flushInterval));
lastFlushTime = t2;
return lastFlushTime;
}