ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact 
of changes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/91db0807
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/91db0807
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/91db0807

Branch: refs/heads/master
Commit: 91db08072b221885f246a9db70abf3ee0bdf170d
Parents: 0fadc68
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Wed Nov 8 09:16:59 2017 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/actors/ArtemisExecutor.java   |  10 +-
 .../artemis/utils/actors/HandlerBase.java       |  47 ++++++
 .../artemis/utils/actors/ProcessorBase.java     | 153 +++++++++++++------
 .../utils/actors/OrderedExecutorSanityTest.java |  69 ++++++++-
 .../core/ServerSessionPacketHandler.java        |  73 +++------
 .../artemis/tests/util/ActiveMQTestBase.java    |   2 +-
 .../tests/integration/client/ConsumerTest.java  |   6 +-
 7 files changed, 260 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 5e72ef2..8efb3d3 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -40,9 +42,15 @@ public interface ArtemisExecutor extends Executor {
 
    /** It will wait the current execution (if there is one) to finish
     *  but will not complete any further executions */
-   default void shutdownNow() {
+   default List<Runnable> shutdownNow() {
+      return Collections.emptyList();
    }
 
+
+   default void shutdown() {
+   }
+
+
    /**
     * This will verify if the executor is flushed with no wait (or very 
minimal wait if not the {@link 
org.apache.activemq.artemis.utils.actors.OrderedExecutor}
     * @return

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
new file mode 100644
index 0000000..6bfbcb4
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+/**
+ * This abstract class will encapsulate
+ * ThreadLocals to determine when a class is a handler.
+ * This is because some functionality has to be avoided if inHandler().
+ *
+ */
+public abstract class HandlerBase {
+
+   //marker instance used to recognize if a thread is performing a packet 
handling
+   private static final Object DUMMY = Boolean.TRUE;
+
+   // this cannot be static as the Actor will be used within another executor. 
For that reason
+   // each instance will have its own ThreadLocal.
+   // ... a thread that has its thread-local map populated with DUMMY while 
performing a handler
+   private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
+
+   protected void enter() {
+      assert inHandler.get() == null : "should be null";
+      inHandler.set(DUMMY);
+   }
+
+   public boolean inHandler() {
+      final Object dummy = inHandler.get();
+      return dummy != null;
+   }
+
+   protected void leave() {
+      assert inHandler.get() != null : "marker not set";
+      inHandler.set(null);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 73dbf2f..1c77a52 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -17,17 +17,24 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
 
-public abstract class ProcessorBase<T> {
+import org.jboss.logging.Logger;
 
-   private static final int STATE_NOT_RUNNING = 0;
-   private static final int STATE_RUNNING = 1;
-   private static final int STATE_FORCED_SHUTDOWN = 2;
+public abstract class ProcessorBase<T> extends HandlerBase {
+
+   private static final Logger logger = Logger.getLogger(ProcessorBase.class);
+
+   public static final int STATE_NOT_RUNNING = 0;
+   public static final int STATE_RUNNING = 1;
+   public static final int STATE_FORCED_SHUTDOWN = 2;
 
    protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
 
@@ -41,6 +48,8 @@ public abstract class ProcessorBase<T> {
 
    private volatile boolean requestedShutdown = false;
 
+   private volatile boolean started = true;
+
    private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater 
= AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
    private final class ExecutorTask implements Runnable {
@@ -50,19 +59,23 @@ public abstract class ProcessorBase<T> {
          do {
             //if there is no thread active and is not already dead then we run
             if (stateUpdater.compareAndSet(ProcessorBase.this, 
STATE_NOT_RUNNING, STATE_RUNNING)) {
+               enter();
                try {
                   T task = tasks.poll();
                   //while the queue is not empty we process in order
-                  while (task != null) {
+                  while (task != null && !requestedShutdown) {
                      //just drain the tasks if has been requested a shutdown 
to help the shutdown process
-                     if (!requestedShutdown) {
-                        doTask(task);
+                     if (requestedShutdown) {
+                        tasks.add(task);
+                        break;
                      }
+                     doTask(task);
                      task = tasks.poll();
                   }
                } finally {
+                  leave();
                   //set state back to not running.
-                  stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
+                  stateUpdater.compareAndSet(ProcessorBase.this, 
STATE_RUNNING, STATE_NOT_RUNNING);
                }
             } else {
                return;
@@ -75,31 +88,57 @@ public abstract class ProcessorBase<T> {
       }
    }
 
-   /** It will wait the current execution (if there is one) to finish
-    *  but will not complete any further executions */
-   public void shutdownNow() {
+   /**
+    * It will shutdown and wait 30 seconds for timeout.
+    */
+   public void shutdown() {
+      shutdown(30, TimeUnit.SECONDS);
+   }
+
+   public void shutdown(long timeout, TimeUnit unit) {
+      started = false;
+
+      if (!inHandler()) {
+         // if it's in handler.. we just return
+         flush(timeout, unit);
+      }
+   }
+
+   /**
+    * It will wait the current execution (if there is one) to finish
+    * but will not complete any further executions
+    */
+   public List<T> shutdownNow() {
       //alert anyone that has been requested (at least) an immediate shutdown
       requestedShutdown = true;
-      //it could take a very long time depending on the current executing task
-      do {
-         //alert the ExecutorTask (if is running) to just drain the current 
backlog of tasks
-         final int startState = stateUpdater.get(this);
-         if (startState == STATE_FORCED_SHUTDOWN) {
-            //another thread has completed a forced shutdown
-            return;
-         }
-         if (startState == STATE_RUNNING) {
-            //wait 100 ms to avoid burning CPU while waiting and
-            //give other threads a chance to make progress
-            LockSupport.parkNanos(100_000_000L);
+      started = false;
+
+      if (inHandler()) {
+         stateUpdater.set(this, STATE_FORCED_SHUTDOWN);
+      } else {
+         //it could take a very long time depending on the current executing 
task
+         do {
+            //alert the ExecutorTask (if is running) to just drain the current 
backlog of tasks
+            final int startState = stateUpdater.get(this);
+            if (startState == STATE_FORCED_SHUTDOWN) {
+               //another thread has completed a forced shutdown
+               break;
+            }
+            if (startState == STATE_RUNNING) {
+               //wait 100 ms to avoid burning CPU while waiting and
+               //give other threads a chance to make progress
+               LockSupport.parkNanos(100_000_000L);
+            }
          }
+         while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, 
STATE_FORCED_SHUTDOWN));
+         //this could happen just one time: the forced shutdown state is the 
last one and
+         //can be set by just one caller.
+         //As noted on the execute method there is a small chance that some 
tasks would be enqueued
       }
-      while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, 
STATE_FORCED_SHUTDOWN));
-      //this could happen just one time: the forced shutdown state is the last 
one and
-      //can be set by just one caller.
-      //As noted on the execute method there is a small chance that some tasks 
would be enqueued
+      ArrayList<T> returnList = new ArrayList<>(tasks);
       tasks.clear();
-      //we can report the killed tasks somehow: ExecutorService do the same on 
shutdownNow
+
+      return returnList;
    }
 
    protected abstract void doTask(T task);
@@ -112,26 +151,48 @@ public abstract class ProcessorBase<T> {
       return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
-   protected void task(T command) {
-      if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) {
-         //The shutdown process could finish right after the above check: 
shutdownNow can drain the remaining tasks
-         tasks.add(command);
-         //cache locally the state to avoid multiple volatile loads
-         final int state = stateUpdater.get(this);
-         if (state == STATE_FORCED_SHUTDOWN) {
-            //help the GC by draining any task just submitted: it help to 
cover the case of a shutdownNow finished before tasks.add
-            tasks.clear();
-         } else if (state == STATE_NOT_RUNNING) {
-            //startPoller could be deleted but is maintained because is 
inherited
-            delegate.execute(task);
+   /**
+    * WARNING: This will only flush when all the activity is suspended.
+    * don't expect success on this call if another thread keeps feeding the 
queue
+    * this is only valid on situations where you are not feeding the queue,
+    * like in shutdown and failover situations.
+    */
+   public final boolean flush(long timeout, TimeUnit unit) {
+      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+         // quick test, most of the time it will be empty anyways
+         return true;
+      }
+
+      long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout);
+      try {
+         while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > 
System.currentTimeMillis()) {
+
+            if (tasks.isEmpty()) {
+               return true;
+            }
+
+            Thread.sleep(10);
          }
+      } catch (InterruptedException e) {
+         // ignored
       }
+
+      return stateUpdater.get(this) == STATE_NOT_RUNNING;
    }
 
-   protected void startPoller() {
-      if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
-         //note that this can result in multiple tasks being queued
-         //this is not an issue as the CAS will mean that the second (and 
subsequent) execution is ignored
+   protected void task(T command) {
+      if (!started) {
+         logger.debug("Ordered executor has been shutdown at", new 
Exception("debug"));
+      }
+      //The shutdown process could finish right after the above check: 
shutdownNow can drain the remaining tasks
+      tasks.add(command);
+      //cache locally the state to avoid multiple volatile loads
+      final int state = stateUpdater.get(this);
+      if (state == STATE_FORCED_SHUTDOWN) {
+         //help the GC by draining any task just submitted: it help to cover 
the case of a shutdownNow finished before tasks.add
+         tasks.clear();
+      } else if (state == STATE_NOT_RUNNING) {
+         //startPoller could be deleted but is maintained because is inherited
          delegate.execute(task);
       }
    }
@@ -146,4 +207,8 @@ public abstract class ProcessorBase<T> {
       return tasks.size();
    }
 
+   public final int status() {
+      return stateUpdater.get(this);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
index 9446f50..4e2bbba 100644
--- 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,7 +71,7 @@ public class OrderedExecutorSanityTest {
          //from now on new tasks won't be executed
          final CountDownLatch afterDeatchExecution = new CountDownLatch(1);
          executor.execute(afterDeatchExecution::countDown);
-         Assert.assertFalse("After shutdownNow no new tasks can be executed", 
afterDeatchExecution.await(1, TimeUnit.SECONDS));
+         Assert.assertFalse("After shutdownNow no new tasks can be executed", 
afterDeatchExecution.await(100, TimeUnit.MILLISECONDS));
          //to avoid memory leaks the executor must take care of the new 
submitted tasks immediatly
          Assert.assertEquals("Any new task submitted after death must be 
collected", 0, executor.remaining());
       } finally {
@@ -78,4 +79,70 @@ public class OrderedExecutorSanityTest {
       }
    }
 
+
+
+   @Test
+   public void shutdownWithin() throws InterruptedException {
+      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+      try {
+         final OrderedExecutor executor = new OrderedExecutor(executorService);
+         final CountDownLatch latch = new CountDownLatch(1);
+         final AtomicInteger numberOfTasks = new AtomicInteger(0);
+         final CountDownLatch ran = new CountDownLatch(1);
+
+         executor.execute(() -> {
+            try {
+               latch.await(1, TimeUnit.MINUTES);
+               numberOfTasks.set(executor.shutdownNow().size());
+               ran.countDown();
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         });
+
+
+         for (int i = 0; i < 100; i++) {
+            executor.execute(() -> System.out.println("Dont worry, this will 
never happen"));
+         }
+
+         latch.countDown();
+         ran.await(1, TimeUnit.SECONDS);
+         Assert.assertEquals(100, numberOfTasks.get());
+
+         Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, 
executor.status());
+         Assert.assertEquals(0, executor.remaining());
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
+
+   @Test
+   public void testMeasure() throws InterruptedException {
+      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+      try {
+         final OrderedExecutor executor = new OrderedExecutor(executorService);
+         int MAX_LOOP = 1_000_000;
+
+         // extend the number for longer numbers
+         int runs = 10;
+
+         for (int i = 0; i < runs; i++) {
+            long start = System.nanoTime();
+            final CountDownLatch executed = new CountDownLatch(MAX_LOOP);
+            for (int l = 0; l < MAX_LOOP; l++) {
+               executor.execute(executed::countDown);
+            }
+            Assert.assertTrue(executed.await(1, TimeUnit.MINUTES));
+            long end = System.nanoTime();
+
+            long elapsed = (end - start);
+
+            System.out.println("execution " + i + " in " + 
TimeUnit.NANOSECONDS.toMillis(elapsed) + " milliseconds");
+         }
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index f78f43f..e1e1b68 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -159,11 +159,6 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private final boolean direct;
 
-   //marker instance used to recognize if a thread is performing a packet 
handling
-   private static final Object DUMMY = Boolean.TRUE;
-
-   //a thread that has its thread-local map populated with DUMMY is performing 
a packet handling
-   private static final ThreadLocal<Object> inHandler = new ThreadLocal<>();
 
    public ServerSessionPacketHandler(final ActiveMQServer server,
                                      final CoreProtocolManager manager,
@@ -231,26 +226,9 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName());
    }
 
-   private static void onStartMessagePacketHandler() {
-      assert inHandler.get() == null : "recursion on packet handling is not 
supported";
-      inHandler.set(DUMMY);
-   }
-
-   private static boolean inHandler() {
-      final Object dummy = inHandler.get();
-      //sanity check: can't exist a thread using a marker different from DUMMY
-      assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong 
marker";
-      return dummy != null;
-   }
-
-   private static void onExitMessagePacketHandler() {
-      assert inHandler.get() != null : "marker not set";
-      inHandler.set(null);
-   }
-
    public void closeExecutors() {
-      packetActor.shutdownNow();
-      callExecutor.shutdownNow();
+      packetActor.shutdown();
+      callExecutor.shutdown();
    }
 
    public void close() {
@@ -280,33 +258,28 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       if (logger.isTraceEnabled()) {
          logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
       }
-      onStartMessagePacketHandler();
-      try {
-         final byte type = packet.getType();
-         switch (type) {
-            case SESS_SEND: {
-               onSessionSend(packet);
-               break;
-            }
-            case SESS_ACKNOWLEDGE: {
-               onSessionAcknowledge(packet);
-               break;
-            }
-            case SESS_PRODUCER_REQUEST_CREDITS: {
-               onSessionRequestProducerCredits(packet);
-               break;
-            }
-            case SESS_FLOWTOKEN: {
-               onSessionConsumerFlowCredit(packet);
-               break;
-            }
-            default:
-               // separating a method for everything else as JIT was faster 
this way
-               slowPacketHandler(packet);
-               break;
+      final byte type = packet.getType();
+      switch (type) {
+         case SESS_SEND: {
+            onSessionSend(packet);
+            break;
          }
-      } finally {
-         onExitMessagePacketHandler();
+         case SESS_ACKNOWLEDGE: {
+            onSessionAcknowledge(packet);
+            break;
+         }
+         case SESS_PRODUCER_REQUEST_CREDITS: {
+            onSessionRequestProducerCredits(packet);
+            break;
+         }
+         case SESS_FLOWTOKEN: {
+            onSessionConsumerFlowCredit(packet);
+            break;
+         }
+         default:
+            // separating a method for everything else as JIT was faster this 
way
+            slowPacketHandler(packet);
+            break;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index a63eec7..2d6f003 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -302,7 +302,7 @@ public abstract class ActiveMQTestBase extends Assert {
          //clean up pools before failing
          if (!exceptions.isEmpty()) {
             for (Exception exception : exceptions) {
-               exception.printStackTrace();
+               exception.printStackTrace(System.out);
             }
             fail("Client Session Factories still trying to reconnect, see 
above to see where created");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 9c05114..ef53344 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -330,9 +330,9 @@ public class ConsumerTest extends ActiveMQTestBase {
          connection.close();
       }
 
-      assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
-      assertNull(server.locateQueue(SimpleString.toSimpleString("queue")));
-      assertEquals(0, server.getTotalMessageCount());
+      Wait.assertTrue(() -> 
server.getAddressInfo(SimpleString.toSimpleString("queue")) == null);
+      Wait.assertTrue(() -> 
server.locateQueue(SimpleString.toSimpleString("queue")) == null);
+      Wait.assertEquals(0, server::getTotalMessageCount);
    }
 
    @Test

Reply via email to