This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 860054df8e fix MemoryStarvedScanIT tests (#3616)
860054df8e is described below

commit 860054df8e909dee2a5658c992d55d351697ae55
Author: EdColeman <[email protected]>
AuthorDate: Mon Jul 17 13:36:49 2023 +0000

    fix MemoryStarvedScanIT tests (#3616)
    
    This should fix immediate issue - but there may be other improvements need 
to reliably run in constrained CI environment.
---
 .../test/functional/MemoryStarvedScanIT.java       | 73 +++++++++++++---------
 1 file changed, 45 insertions(+), 28 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
index 70d80f746f..d06cef1308 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.accumulo.core.metrics.MetricsProducer.METRICS_LOW_MEMORY;
 import static org.apache.accumulo.test.util.Wait.waitFor;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -222,16 +224,15 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
           Scanner memoryConsumingScanner = client.createScanner(table)) {
 
         dataConsumingScanner.addScanIterator(
-            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"500")));
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"2000")));
         dataConsumingScanner.setBatchSize(1);
         dataConsumingScanner.setReadaheadThreshold(Long.MAX_VALUE);
         Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
         AtomicInteger fetched = new AtomicInteger(0);
         Thread t = new Thread(() -> {
-          int i = 0;
           while (iter.hasNext()) {
             iter.next();
-            fetched.set(++i);
+            fetched.incrementAndGet();
           }
         });
 
@@ -258,7 +259,7 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
         // Confirm that some data was fetched by the memoryConsumingScanner
         currentCount = fetched.get();
         assertTrue(currentCount > 0 && currentCount < 100);
-        LOG.info("Memory consumed");
+        LOG.info("Memory consumed after reading {} rows", currentCount);
 
         // Grab the current metric counts, wait
         final double returned = SCAN_RETURNED_EARLY.doubleValue();
@@ -339,21 +340,22 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
       // check memory okay before starting
       assertEquals(0, LOW_MEM_DETECTED.get());
 
-      ReadWriteIT.ingest(client, 10, 3, 10, 0, table);
+      // generate enough data so more than one batch is returned.
+      ReadWriteIT.ingest(client, 1000, 3, 10, 0, table);
 
       try (BatchScanner dataConsumingScanner = 
client.createBatchScanner(table);
           Scanner memoryConsumingScanner = client.createScanner(table)) {
 
+        // add enough delay that batch does not return all rows and we get 
more than one batch
         dataConsumingScanner.addScanIterator(
-            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"500")));
+            new IteratorSetting(11, SlowIterator.class, Map.of("sleepTime", 
"50")));
         dataConsumingScanner.setRanges(Collections.singletonList(new Range()));
         Iterator<Entry<Key,Value>> iter = dataConsumingScanner.iterator();
         AtomicInteger fetched = new AtomicInteger(0);
         Thread t = new Thread(() -> {
-          int i = 0;
           while (iter.hasNext()) {
             iter.next();
-            fetched.set(++i);
+            fetched.incrementAndGet();
           }
         });
 
@@ -364,40 +366,36 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
 
         t.start();
 
-        // Wait until the dataConsumingScanner has started fetching data
-        int currentCount = fetched.get();
-        while (currentCount == 0) {
-          Thread.sleep(500);
-          currentCount = fetched.get();
-        }
+        // Wait for a batch to be returned
+        waitFor(() -> fetched.get() > 0, MINUTES.toMillis(5), 200);
 
         // This should block until the GarbageCollectionLogger runs and 
notices that the
         // VM is low on memory.
         Iterator<Entry<Key,Value>> consumingIter = 
memoryConsumingScanner.iterator();
         assertTrue(consumingIter.hasNext());
 
-        // Confirm that some data was fetched by the dataConsumingScanner
-        currentCount = fetched.get();
-        assertTrue(currentCount > 0 && currentCount < 100);
-
-        // Grab the current paused count, wait two seconds and then confirm 
that
-        // the number of rows fetched by the memoryConsumingScanner has not 
increased
+        // Grab the current paused count, the number of rows fetched by the 
memoryConsumingScanner
+        // has not increased
         // and that the scan delay counter has increased.
         final double returned = SCAN_RETURNED_EARLY.doubleValue();
         final double paused = SCAN_START_DELAYED.doubleValue();
 
-        final int currentCountCopy = currentCount;
-        waitFor(
-            () -> (currentCountCopy == fetched.get() && 
SCAN_START_DELAYED.doubleValue() > paused)
-                || (currentCountCopy + 1 == fetched.get()
-                    && SCAN_RETURNED_EARLY.doubleValue() > returned));
+        // Confirm that some data was fetched by the dataConsumingScanner
+        int currentCount = fetched.get();
+        LOG.info("rows read in first batch: {}", currentCount);
+        // check some, but not all rows have been read
+        assertTrue(currentCount > 0 && currentCount < 3000);
+
+        final int startCount = currentCount;
+        waitFor(() -> verifyBatchedStalled(fetched.get(), startCount, paused, 
returned),
+            MINUTES.toMillis(2), SECONDS.toMillis(2));
         waitFor(() -> 1 == LOW_MEM_DETECTED.get());
 
-        // Perform the check again
+        // Perform the check again - checking that rows fetched not advancing
         final double paused2 = SCAN_START_DELAYED.doubleValue();
         final double returned2 = SCAN_RETURNED_EARLY.doubleValue();
         Thread.sleep(1500);
-        assertEquals(currentCount, fetched.get());
+        assertEquals(startCount, fetched.get());
         assertTrue(SCAN_START_DELAYED.doubleValue() >= paused2);
         assertEquals(returned2, SCAN_RETURNED_EARLY.doubleValue());
         waitFor(() -> 1 == LOW_MEM_DETECTED.get());
@@ -405,8 +403,11 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
         // Free the memory which will allow the pausing scanner to continue
         freeServerMemory(client);
 
+        waitFor(() -> 0 == LOW_MEM_DETECTED.get());
+
         t.join();
-        assertEquals(30, fetched.get());
+        // check that remain rows have been read
+        assertEquals(3000, fetched.get());
 
       } finally {
         to.delete(table);
@@ -414,6 +415,22 @@ public class MemoryStarvedScanIT extends 
SharedMiniClusterBase {
     }
   }
 
+  private boolean verifyBatchedStalled(final int currCount, final int 
startCount,
+      final double paused, final double returned) {
+    if (startCount == currCount && SCAN_START_DELAYED.doubleValue() > paused) {
+      LOG.debug("found expected pause because of low memory");
+      return true;
+    }
+    if (startCount == currCount && SCAN_RETURNED_EARLY.doubleValue() > 
returned) {
+      LOG.debug("found expected early return because of low memory");
+      return true;
+    }
+    LOG.info(
+        "waiting for low memory pause. prev count: {}, curr count: {}, paused: 
{}, returned: {}",
+        startCount, currCount, SCAN_START_DELAYED.doubleValue(), 
SCAN_RETURNED_EARLY.doubleValue());
+    return false;
+  }
+
   /**
    * Check that the low memory condition is set and remains set until free 
memory is available.
    */

Reply via email to