This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 6ec476e26d681e597ed47a3f691811679cf25549 Author: Clebert Suconic <[email protected]> AuthorDate: Fri Dec 10 22:24:07 2021 -0500 ARTEMIS-3604 Small test fix on ThresholdActorTest (cherry picked from commit af13d90c5747489346965189972adecbb37f4e11) --- .../artemis/utils/actors/ThresholdActorTest.java | 36 +++++++++++++++++----- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java index 5c715ec..01e5471 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java @@ -17,9 +17,11 @@ package org.apache.activemq.artemis.utils.actors; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -119,7 +121,7 @@ public class ThresholdActorTest { public void block() { try { - if (!semaphore.tryAcquire()) { + if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) { errors.incrementAndGet(); System.err.println("acquire failed"); } @@ -128,18 +130,36 @@ public class ThresholdActorTest { } } + public void unblock() { + semaphore.release(); + } + @Test public void testFlow() throws Exception { - final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ExecutorService executorService = Executors.newFixedThreadPool(2); try { - ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, semaphore::release); - - final int LAST_ELEMENT = 1000; + ThresholdActor<Element> actor = new ThresholdActor<>(executorService, this::process, 20, (e) -> e.size, this::block, this::unblock); + + final int LAST_ELEMENT = 1111; + + final CountDownLatch latchDone = new CountDownLatch(1); + + executorService.execute(() -> { + for (int i = 0; i <= LAST_ELEMENT; i++) { + try { + semaphore.acquire(); + semaphore.release(); + actor.act(new Element(i, i % 2 == 0 ? 20 : 1)); + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + latchDone.countDown(); + }); - for (int i = 0; i <= LAST_ELEMENT; i++) { - actor.act(new Element(i, i % 2 == 0 ? 20 : 1)); - } + Assert.assertTrue(latchDone.await(10, TimeUnit.SECONDS)); Wait.assertEquals(LAST_ELEMENT, lastProcessed::get); Assert.assertEquals(0, errors.get());
