This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push:
new 711d28e Add deletes to continuous ingest (#166)
711d28e is described below
commit 711d28e83a91cbaa0754064a7d72c0f975f7de31
Author: Dom G <[email protected]>
AuthorDate: Fri Nov 19 13:04:02 2021 -0500
Add deletes to continuous ingest (#166)
* Add the ability for the deletion of entries to occur while running
continuous ingest
---
conf/accumulo-testing.properties | 3 +
.../org/apache/accumulo/testing/TestProps.java | 2 +
.../testing/continuous/ContinuousIngest.java | 85 ++++++++++++++++------
3 files changed, 69 insertions(+), 21 deletions(-)
diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 4f0e749..6ef5855 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -82,6 +82,9 @@ test.ci.ingest.pause.wait.max=180
test.ci.ingest.pause.duration.min=60
# Maximum pause duration (in seconds)
test.ci.ingest.pause.duration.max=120
+# The probability (between 0.0 and 1.0) that a set of entries will be deleted
during continuous ingest
+# To disable deletes, set probability to 0
+test.ci.ingest.delete.probability=0.1
# Batch walker
# ------------
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java
b/src/main/java/org/apache/accumulo/testing/TestProps.java
index c4c8948..e7801d3 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -94,6 +94,8 @@ public class TestProps {
public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST +
"pause.duration.max";
// Amount of data to write before flushing. Pause checks are only done after
flush.
public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST +
"entries.flush";
+ // The probability (between 0.0 and 1.0) that a set of entries will be
deleted during continuous ingest
+ public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST +
"delete.probability";
/** Batch Walker **/
// Sleep time between batch scans (in ms)
diff --git
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 66f5152..459102f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -74,6 +74,14 @@ public class ContinuousIngest {
return (rand.nextInt(max - min) + min);
}
+ private static float getDeleteProbability(Properties props) {
+ String stringValue =
props.getProperty(TestProps.CI_INGEST_DELETE_PROBABILITY);
+ float prob = Float.parseFloat(stringValue);
+ Preconditions.checkArgument(prob >= 0.0 && prob <= 1.0,
+ "Delete probability should be between 0.0 and 1.0");
+ return prob;
+ }
+
private static int getFlushEntries(Properties props) {
return
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES,
"1000000"));
}
@@ -128,11 +136,8 @@ public class ContinuousIngest {
// always want to point back to flushed data. This way the previous item
should
// always exist in accumulo when verifying data. To do this make insert
N point
// back to the row from insert (N - flushInterval). The array below is
used to keep
- // track of this.
- long[] prevRows = new long[flushInterval];
- long[] firstRows = new long[flushInterval];
- int[] firstColFams = new int[flushInterval];
- int[] firstColQuals = new int[flushInterval];
+ // track of all inserts.
+ MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
long lastFlushTime = System.currentTimeMillis();
@@ -149,20 +154,21 @@ public class ContinuousIngest {
log.info("INGESTING for " + pauseWaitSec + "s");
}
+ final float deleteProbability = getDeleteProbability(testProps);
+ log.info("DELETES will occur with a probability of {}",
+ String.format("%.02f", deleteProbability));
+
out: while (true) {
- // generate first set of nodes
ColumnVisibility cv = getVisibility(r);
+ // generate first set of nodes
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, r);
- prevRows[index] = rowLong;
- firstRows[index] = rowLong;
int cf = r.nextInt(maxColF);
int cq = r.nextInt(maxColQ);
- firstColFams[index] = cf;
- firstColQuals[index] = cq;
+ nodeMap[0][index] = new MutationInfo(rowLong, cf, cq);
Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId,
count, null, checksum);
count++;
@@ -177,10 +183,12 @@ public class ContinuousIngest {
for (int depth = 1; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, r);
- byte[] prevRow = genRow(prevRows[index]);
- prevRows[index] = rowLong;
- Mutation m = genMutation(rowLong, r.nextInt(maxColF),
r.nextInt(maxColQ), cv,
- ingestInstanceId, count, prevRow, checksum);
+ byte[] prevRow = genRow(nodeMap[depth - 1][index].row);
+ int cfInt = r.nextInt(maxColF);
+ int cqInt = r.nextInt(maxColQ);
+ nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
+ Mutation m = genMutation(rowLong, cfInt, cqInt, cv,
ingestInstanceId, count, prevRow,
+ checksum);
count++;
bw.addMutation(m);
}
@@ -191,14 +199,36 @@ public class ContinuousIngest {
pauseCheck(testProps, r);
}
- // create one big linked list, this makes all of the first inserts
point to something
- for (int index = 0; index < flushInterval - 1; index++) {
- Mutation m = genMutation(firstRows[index], firstColFams[index],
firstColQuals[index], cv,
- ingestInstanceId, count, genRow(prevRows[index + 1]), checksum);
- count++;
- bw.addMutation(m);
+ // random chance that the entries will be deleted
+ boolean delete = r.nextFloat() < deleteProbability;
+
+ // if the previously written entries are scheduled to be deleted
+ if (delete) {
+ log.info("Deleting last portion of written entries");
+ // add delete mutations in the reverse order in which they were
written
+ for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
+ for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
+ MutationInfo currentNode = nodeMap[depth][index];
+ Mutation m = new Mutation(genRow(currentNode.row));
+ m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
+ bw.addMutation(m);
+ }
+ lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+ pauseCheck(testProps, r);
+ }
+ } else {
+ // create one big linked list, this makes all the first inserts
point to something
+ for (int index = 0; index < flushInterval - 1; index++) {
+ MutationInfo firstEntry = nodeMap[0][index];
+ MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
+ Mutation m = genMutation(firstEntry.row, firstEntry.cf,
firstEntry.cq, cv,
+ ingestInstanceId, count, genRow(lastEntry.row), checksum);
+ count++;
+ bw.addMutation(m);
+ }
+ lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
}
- lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+
if (count >= numEntries)
break out;
pauseCheck(testProps, r);
@@ -207,6 +237,19 @@ public class ContinuousIngest {
}
}
+ private static class MutationInfo {
+
+ long row;
+ int cf;
+ int cq;
+
+ public MutationInfo(long row, int cf, int cq) {
+ this.row = row;
+ this.cf = cf;
+ this.cq = cq;
+ }
+ }
+
public static List<ColumnVisibility> parseVisibilities(String visString) {
List<ColumnVisibility> vis;
if (visString == null) {