This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 860054df8e fix MemoryStarvedScanIT tests (#3616)
860054df8e is described below
commit 860054df8e909dee2a5658c992d55d351697ae55
Author: EdColeman <[email protected]>
AuthorDate: Mon Jul 17 13:36:49 2023 +0000
fix MemoryStarvedScanIT tests (#3616)
This should fix immediate issue - but there may be other improvements need
to reliably run in constrained CI environment.
---
.../test/functional/MemoryStarvedScanIT.java | 73 +++++++++++++---------
1 file changed, 45 insertions(+), 28 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 70d80f746f..d06cef1308 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.test.functional;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY;
import static org.apache.accumulo.test.util.Wait.waitFor;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -222,16 +224,15 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
Scanner memoryConsumingScanner = client.createScanner(table)) {
dataConsumingScanner.addScanIterator(
- new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime",
"500")));
+ new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime",
"2000")));
dataConsumingScanner.setBatchSize(1);
dataConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
AtomicInteger fetched = new AtomicInteger(0);
Thread t = new Thread(() -> {
- int i = 0;
while (iter.hasNext()) {
iter.next();
- fetched.set(++i);
+ fetched.incrementAndGet();
}
});
@@ -258,7 +259,7 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
// Confirm that some data was fetched by the memoryConsumingScanner
currentCount = fetched.get();
assertTrue(currentCount > 0 && currentCount < 100);
- LOG.info("Memory consumed");
+ LOG.info("Memory consumed after reading {} rows", currentCount);
// Grab the current metric counts, wait
final double returned = SCAN_RETURNED_EARLY.doubleValue();
@@ -339,21 +340,22 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
// check memory okay before starting
assertEquals(0, LOW_MEM_DETECTED.get());
- ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+ // generate enough data so more than one batch is returned.
+ ReadWriteIT.ingest(client, 1000, 3, 10, 0, table);
try (BatchScanner dataConsumingScanner =
client.createBatchScanner(table);
Scanner memoryConsumingScanner = client.createScanner(table)) {
+ // add enough delay that batch does not return all rows and we get
more than one batch
dataConsumingScanner.addScanIterator(
- new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime",
"500")));
+ new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime",
"50")));
dataConsumingScanner.setRanges(Collections.singletonList(new Range()));
Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
AtomicInteger fetched = new AtomicInteger(0);
Thread t = new Thread(() -> {
- int i = 0;
while (iter.hasNext()) {
iter.next();
- fetched.set(++i);
+ fetched.incrementAndGet();
}
});
@@ -364,40 +366,36 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
t.start();
- // Wait until the dataConsumingScanner has started fetching data
- int currentCount = fetched.get();
- while (currentCount == 0) {
- Thread.sleep(500);
- currentCount = fetched.get();
- }
+ // Wait for a batch to be returned
+ waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200);
// This should block until the GarbageCollectionLogger runs and
notices that the
// VM is low on memory.
Iterator<Entry<Key,Value>> consumingIter =
memoryConsumingScanner.iterator();
assertTrue(consumingIter.hasNext());
- // Confirm that some data was fetched by the dataConsumingScanner
- currentCount = fetched.get();
- assertTrue(currentCount > 0 && currentCount < 100);
-
- // Grab the current paused count, wait two seconds and then confirm
that
- // the number of rows fetched by the memoryConsumingScanner has not
increased
+ // Grab the current paused count, the number of rows fetched by the
memoryConsumingScanner
+ // has not increased
// and that the scan delay counter has increased.
final double returned = SCAN_RETURNED_EARLY.doubleValue();
final double paused = SCAN_START_DELAYED.doubleValue();
- final int currentCountCopy = currentCount;
- waitFor(
- () -> (currentCountCopy == fetched.get() &&
SCAN_START_DELAYED.doubleValue() > paused)
- || (currentCountCopy + 1 == fetched.get()
- && SCAN_RETURNED_EARLY.doubleValue() > returned));
+ // Confirm that some data was fetched by the dataConsumingScanner
+ int currentCount = fetched.get();
+ LOG.info("rows read in first batch: {}", currentCount);
+ // check some, but not all rows have been read
+ assertTrue(currentCount > 0 && currentCount < 3000);
+
+ final int startCount = currentCount;
+ waitFor(() -> verifyBatchedStalled(fetched.get(), startCount, paused,
returned),
+ MINUTES.toMillis(2), SECONDS.toMillis(2));
waitFor(() -> 1 == LOW_MEM_DETECTED.get());
- // Perform the check again
+ // Perform the check again - checking that rows fetched not advancing
final double paused2 = SCAN_START_DELAYED.doubleValue();
final double returned2 = SCAN_RETURNED_EARLY.doubleValue();
Thread.sleep(1500);
- assertEquals(currentCount, fetched.get());
+ assertEquals(startCount, fetched.get());
assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2);
assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue());
waitFor(() -> 1 == LOW_MEM_DETECTED.get());
@@ -405,8 +403,11 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
// Free the memory which will allow the pausing scanner to continue
freeServerMemory(client);
+ waitFor(() -> 0 == LOW_MEM_DETECTED.get());
+
t.join();
- assertEquals(30, fetched.get());
+ // check that remain rows have been read
+ assertEquals(3000, fetched.get());
} finally {
to.delete(table);
@@ -414,6 +415,22 @@ public class MemoryStarvedScanIT extends
SharedMiniClusterBase {
}
}
+ private boolean verifyBatchedStalled(final int currCount, final int
startCount,
+ final double paused, final double returned) {
+ if (startCount == currCount && SCAN_START_DELAYED.doubleValue() > paused) {
+ LOG.debug("found expected pause because of low memory");
+ return true;
+ }
+ if (startCount == currCount && SCAN_RETURNED_EARLY.doubleValue() >
returned) {
+ LOG.debug("found expected early return because of low memory");
+ return true;
+ }
+ LOG.info(
+ "waiting for low memory pause. prev count: {}, curr count: {}, paused:
{}, returned: {}",
+ startCount, currCount, SCAN_START_DELAYED.doubleValue(),
SCAN_RETURNED_EARLY.doubleValue());
+ return false;
+ }
+
/**
* Check that the low memory condition is set and remains set until free
memory is available.
*/