This is an automated email from the ASF dual-hosted git repository. djoshi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new f9ddaf1 Fix flaky SEPExecutor.changingMaxWorkersMeetsConcurrencyGoalsTest f9ddaf1 is described below commit f9ddaf1841147fc284e802739ca42403aa2816ae Author: Jon Meredith <jmeredit...@apple.com> AuthorDate: Thu Apr 9 16:59:53 2020 -0600 Fix flaky SEPExecutor.changingMaxWorkersMeetsConcurrencyGoalsTest Thread scheduling is not guaranteed to be fair and having the BusyWork tasks reschedule itself makes sure there is always more work for the SEPWorker once it finishes, so it can hog all the CPU if run with a low number of cores. To randomize the scheduling better, introduce a second thread that keeps the executor primed with work, but guarantees a thread switch by waiting on the sempahore. Also resolves a cleanup bug - the sharedPool was not being shutdown correctly. Patch by Jon Meredith; reviewed by David Capwell and Dinesh Joshi for CASSANDRA-15709 --- .../cassandra/concurrent/SEPExecutorTest.java | 85 +++++++++++++--------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java index e3d8556..9a2d52d 100644 --- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java +++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java @@ -23,7 +23,9 @@ import java.io.PrintStream; import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,6 +34,7 @@ import org.junit.Test; import org.apache.cassandra.utils.FBUtilities; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; public class SEPExecutorTest @@ -75,48 +78,68 @@ public class SEPExecutorTest } @Test - public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException + public void changingMaxWorkersMeetsConcurrencyGoalsTest() throws InterruptedException, TimeoutException { - final int numBusyWorkers = 2; // Number of busy worker threads to run and gum things up + // Number of busy worker threads to run and gum things up. Chosen to be + // between the low and high max pool size so the test exercises resizing + // under a number of different conditions. + final int numBusyWorkers = 2; SharedExecutorPool sharedPool = new SharedExecutorPool("ChangingMaxWorkersMeetsConcurrencyGoalsTest"); final AtomicInteger notifiedMaxPoolSize = new AtomicInteger(); LocalAwareExecutorService executor = sharedPool.newExecutor(0, notifiedMaxPoolSize::set, 4, "internal", "resizetest"); + // Keep feeding the executor work while resizing + // so it stays under load. AtomicBoolean stayBusy = new AtomicBoolean(true); - for (int i = 0; i < numBusyWorkers; i++) - { - executor.execute(new BusyWork(executor, stayBusy)); - } + Semaphore busyWorkerPermits = new Semaphore(numBusyWorkers); + Thread makeBusy = new Thread(() -> { + while (stayBusy.get() == true) + { + try + { + if (busyWorkerPermits.tryAcquire(1, MILLISECONDS)) { + executor.execute(new BusyWork(busyWorkerPermits)); + } + } + catch (InterruptedException e) + { + // ignore, will either stop looping if done or retry the lock + } + } + }); - final int previousConcurrency = executor.getMaximumPoolSize(); + makeBusy.start(); try { - assertMaxTaskConcurrency(executor, 1); - Assert.assertEquals(1, notifiedMaxPoolSize.get()); + for (int repeat = 0; repeat < 1000; repeat++) + { + assertMaxTaskConcurrency(executor, 1); + Assert.assertEquals(1, notifiedMaxPoolSize.get()); - assertMaxTaskConcurrency(executor, 2); - Assert.assertEquals(2, notifiedMaxPoolSize.get()); + assertMaxTaskConcurrency(executor, 2); + Assert.assertEquals(2, notifiedMaxPoolSize.get()); - assertMaxTaskConcurrency(executor, 1); - Assert.assertEquals(1, notifiedMaxPoolSize.get()); + assertMaxTaskConcurrency(executor, 1); + Assert.assertEquals(1, notifiedMaxPoolSize.get()); - assertMaxTaskConcurrency(executor, 3); - Assert.assertEquals(3, notifiedMaxPoolSize.get()); + assertMaxTaskConcurrency(executor, 3); + Assert.assertEquals(3, notifiedMaxPoolSize.get()); - executor.setMaximumPoolSize(0); - Assert.assertEquals(0, notifiedMaxPoolSize.get()); + executor.setMaximumPoolSize(0); + Assert.assertEquals(0, notifiedMaxPoolSize.get()); - assertMaxTaskConcurrency(executor, 4); - Assert.assertEquals(4, notifiedMaxPoolSize.get()); + assertMaxTaskConcurrency(executor, 4); + Assert.assertEquals(4, notifiedMaxPoolSize.get()); + } } finally { stayBusy.set(false); - executor.setMaximumPoolSize(previousConcurrency); - executor.shutdownNow(); - Assert.assertTrue(executor.isShutdown()); - Assert.assertTrue(executor.awaitTermination(1L, TimeUnit.MINUTES)); + makeBusy.join(TimeUnit.SECONDS.toMillis(5)); + Assert.assertFalse("makeBusy thread should have checked stayBusy and exited", + makeBusy.isAlive()); + sharedPool.shutdownAndWait(1L, MINUTES); } } @@ -149,21 +172,16 @@ public class SEPExecutorTest static class BusyWork implements Runnable { - private ExecutorService executor; - private AtomicBoolean stayBusy; + private Semaphore busyWorkers; - public BusyWork(ExecutorService executor, AtomicBoolean stayBusy) + public BusyWork(Semaphore busyWorkers) { - this.executor = executor; - this.stayBusy = stayBusy; + this.busyWorkers = busyWorkers; } public void run() { - if (stayBusy.get()) - { - executor.execute(new BusyWork(executor, stayBusy)); - } + busyWorkers.release(); } } @@ -177,6 +195,7 @@ public class SEPExecutorTest executor.execute(new LatchWaiter(concurrencyGoal, 5L, TimeUnit.SECONDS)); } // Will return true if all of the LatchWaiters count down before the timeout - Assert.assertEquals(true, concurrencyGoal.await(3L, TimeUnit.SECONDS)); + Assert.assertEquals("Test tasks did not hit max concurrency goal", + true, concurrencyGoal.await(3L, TimeUnit.SECONDS)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org