This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7005dcd004994c8e771015f948ec4d23f8c35dcd
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Dec 10 12:41:50 2021 -0500

    ARTEMIS-3604 Async sends overflowing server in OpenWire
    
    (cherry picked from commit 1e6297957766138c312273cbbd15327c22f91c53)
---
 .../artemis/utils/actors/ThresholdActor.java       |  96 +++++++++
 .../artemis/utils/actors/ThresholdActorTest.java   | 152 ++++++++++++++
 .../core/protocol/openwire/OpenWireConnection.java |  62 +++++-
 .../protocol/openwire/OpenWireProtocolManager.java |  32 ++-
 .../core/protocol/openwire/amq/AMQSession.java     |  24 +--
 .../activemq/artemis/core/paging/PagingStore.java  |   2 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |   9 +-
 .../core/remoting/impl/netty/NettyAcceptor.java    |   4 +
 .../storage/PersistMultiThreadTest.java            |   2 +-
 .../src/main/resources/servers/paging/broker.xml   |   2 +-
 .../smoke/paging/FloodServerWithAsyncSendTest.java | 221 +++++++++++++++++++++
 11 files changed, 580 insertions(+), 26 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
new file mode 100644
index 0000000..c7e51d7
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.ToIntFunction;
+
+import org.jboss.logging.Logger;
+
+public class ThresholdActor<T> extends ProcessorBase<Object> {
+
+   private static final Logger logger = Logger.getLogger(ThresholdActor.class);
+
+   private static final AtomicIntegerFieldUpdater<ThresholdActor> SIZE_UPDATER 
= AtomicIntegerFieldUpdater.newUpdater(ThresholdActor.class, "size");
+   private volatile int size = 0;
+
+   private static final AtomicIntegerFieldUpdater<ThresholdActor> 
SCHEDULED_FLUSH_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(ThresholdActor.class, "scheduledFlush");
+   private volatile int scheduledFlush = 0;
+
+   private static final Object FLUSH = new Object();
+
+   private final int maxSize;
+   private final ToIntFunction<T> sizeGetter;
+   private final ActorListener<T> listener;
+   private final Runnable overThreshold;
+   private final Runnable clearThreshold;
+
+   public ThresholdActor(Executor parent, ActorListener<T> listener, int 
maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable 
clearThreshold) {
+      super(parent);
+      this.listener = listener;
+      this.maxSize = maxSize;
+      this.sizeGetter = sizeGetter;
+      this.overThreshold = overThreshold;
+      this.clearThreshold = clearThreshold;
+   }
+
+   @Override
+   protected final void doTask(Object task) {
+      if (task == FLUSH) {
+         clearThreshold.run();
+         // should set to 0 no matter the value. There's a single thread 
setting this value back to zero
+         SCHEDULED_FLUSH_UPDATER.set(this, 0);
+         return;
+      }
+
+      final T theTask = (T)task;
+
+      int estimateSize = sizeGetter.applyAsInt(theTask);
+
+      try {
+         listener.onMessage(theTask);
+      } finally {
+         if (estimateSize > 0) {
+            SIZE_UPDATER.getAndAdd(this, -size);
+         } else {
+            logger.debug("element " + theTask + " returned an invalid size 
over the Actor during release");
+         }
+      }
+   }
+
+   public void act(T message) {
+      int sizeEstimate = sizeGetter.applyAsInt(message);
+      if (sizeEstimate > 0) {
+         int size = SIZE_UPDATER.addAndGet(this, 
sizeGetter.applyAsInt(message));
+         if (size > maxSize) {
+            flush();
+         }
+      } else {
+         logger.debug("element " + message + " returned an invalid size over 
the Actor");
+      }
+      task(message);
+   }
+
+   public void flush() {
+      if (SCHEDULED_FLUSH_UPDATER.compareAndSet(this, 0, 1)) {
+         overThreshold.run();
+         task(FLUSH);
+      }
+   }
+}
\ No newline at end of file
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
new file mode 100644
index 0000000..5c715ec
--- /dev/null
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ThresholdActorTest {
+
+   Semaphore semaphore = new Semaphore(1);
+   AtomicInteger result = new AtomicInteger(0);
+   AtomicInteger lastProcessed = new AtomicInteger(0);
+   AtomicInteger errors = new AtomicInteger(0);
+
+   @Test
+   public void limitedSize() throws Exception {
+      lastProcessed.set(0);
+      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+      AtomicInteger timesOpen = new AtomicInteger(0);
+      AtomicInteger timesClose = new AtomicInteger(0);
+      AtomicBoolean open = new AtomicBoolean(true);
+      try {
+         semaphore.acquire();
+         ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, 
this::limitedProcess, 10, (s) -> 1, () -> {
+            timesClose.incrementAndGet();
+            open.set(false);
+         }, () -> {
+            timesOpen.incrementAndGet();
+            open.set(true);
+         });
+
+         for (int i = 0; i < 10; i++) {
+            actor.act(i);
+         }
+         Assert.assertTrue(open.get());
+         Assert.assertEquals(0, timesClose.get());
+
+         actor.act(99);
+         Assert.assertEquals(1, timesClose.get());
+         Assert.assertEquals(0, timesOpen.get());
+
+         Assert.assertFalse(open.get());
+
+         actor.act(1000);
+
+         actor.flush(); // a flush here shuld not change anything, as it was 
already called once on the previous overflow
+         Assert.assertEquals(1, timesClose.get());
+         Assert.assertEquals(0, timesOpen.get());
+         Assert.assertFalse(open.get());
+
+         semaphore.release();
+         Wait.assertTrue(open::get);
+
+         Assert.assertEquals(1, timesClose.get());
+         Assert.assertEquals(1, timesOpen.get());
+         Wait.assertEquals(1000, lastProcessed::get, 5000, 1);
+
+         actor.flush();
+
+         open.set(false);
+
+         // measuring after forced flush
+         Wait.assertEquals(2, timesOpen::get, 5000, 1);
+         Wait.assertTrue(open::get);
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
+   public void limitedProcess(Integer i) {
+      try {
+         semaphore.acquire();
+         result.incrementAndGet();
+         lastProcessed.set(i);
+         semaphore.release();
+      } catch (Throwable e) {
+         e.printStackTrace();
+      }
+   }
+
+   static class Element {
+      Element(int i, int size) {
+         this.i = i;
+         this.size = size;
+      }
+      int i;
+      int size;
+   }
+
+   private static int getSize(Element e) {
+      return e.size;
+   }
+
+   protected void process(Element e) {
+      lastProcessed.set(e.i);
+   }
+
+   public void block() {
+      try {
+         if (!semaphore.tryAcquire()) {
+            errors.incrementAndGet();
+            System.err.println("acquire failed");
+         }
+      } catch (Exception e) {
+         e.printStackTrace();
+      }
+   }
+
+   @Test
+   public void testFlow() throws Exception {
+      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+
+      try {
+         ThresholdActor<Element> actor = new ThresholdActor<>(executorService, 
this::process, 20, (e) -> e.size, this::block, semaphore::release);
+
+         final int LAST_ELEMENT = 1000;
+
+         for (int i = 0; i <= LAST_ELEMENT; i++) {
+            actor.act(new Element(i, i % 2 == 0 ? 20 : 1));
+         }
+
+         Wait.assertEquals(LAST_ELEMENT, lastProcessed::get);
+         Assert.assertEquals(0, errors.get());
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
+
+}
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 9e8dee7..2f90bcf 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -61,6 +61,7 @@ import 
org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import 
org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -83,7 +84,7 @@ import 
org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.activemq.artemis.utils.actors.Actor;
+import org.apache.activemq.artemis.utils.actors.ThresholdActor;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -138,6 +139,9 @@ import org.jboss.logging.Logger;
  */
 public class OpenWireConnection extends AbstractRemotingConnection implements 
SecurityAuth, TempQueueObserver {
 
+   // to be used on the packet size estimate processing for the ThresholdActor
+   private static final int MINIMAL_SIZE_ESTIAMTE = 1024;
+
    private static final Logger logger = 
Logger.getLogger(OpenWireConnection.class);
 
    private static final KeepAliveInfo PING = new KeepAliveInfo();
@@ -153,6 +157,8 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    private AMQConnectionContext context;
 
+   private final int actorThresholdBytes;
+
    private final AtomicBoolean stopping = new AtomicBoolean(false);
 
    private final Map<String, SessionId> sessionIdMap = new 
ConcurrentHashMap<>();
@@ -188,10 +194,13 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    private static final AtomicLongFieldUpdater<OpenWireConnection> 
LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, 
"lastSent");
    private volatile long lastSent = -1;
+
+   private volatile boolean autoRead = true;
+
    private ConnectionEntry connectionEntry;
    private boolean useKeepAlive;
    private long maxInactivityDuration;
-   private volatile Actor<Command> openWireActor;
+   private volatile ThresholdActor<Command> openWireActor;
 
    private final Set<SimpleString> knownDestinations = new 
ConcurrentHashSet<>();
 
@@ -202,6 +211,15 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
                              OpenWireProtocolManager openWireProtocolManager,
                              OpenWireFormat wf,
                              Executor executor) {
+      this(connection, server, openWireProtocolManager, wf, executor, 
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE);
+   }
+
+   public OpenWireConnection(Connection connection,
+                             ActiveMQServer server,
+                             OpenWireProtocolManager openWireProtocolManager,
+                             OpenWireFormat wf,
+                             Executor executor,
+                             int actorThresholdBytes) {
       super(connection, executor);
       this.server = server;
       this.operationContext = server.newOperationContext();
@@ -210,6 +228,8 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       this.outWireFormat = wf.copy();
       this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
       this.maxInactivityDuration = 
openWireProtocolManager.getMaxInactivityDuration();
+      this.transportConnection.setProtocolConnection(this);
+      this.actorThresholdBytes = actorThresholdBytes;
    }
 
    // SecurityAuth implementation
@@ -282,9 +302,9 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             traceBufferReceived(connectionID, command);
          }
 
-         final Actor<Command> localVisibleActor = openWireActor;
+         final ThresholdActor<Command> localVisibleActor = openWireActor;
          if (localVisibleActor != null) {
-            openWireActor.act(command);
+            localVisibleActor.act(command);
          } else {
             act(command);
          }
@@ -295,6 +315,30 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    }
 
+   public void restoreAutoRead() {
+      if (!autoRead) {
+         autoRead = true;
+         openWireActor.flush();
+      }
+   }
+
+   public void blockConnection() {
+      autoRead = false;
+      disableAutoRead();
+   }
+
+   private void disableAutoRead() {
+      getTransportConnection().setAutoRead(false);
+      disableTtl();
+   }
+
+   protected void flushedActor() {
+      getTransportConnection().setAutoRead(autoRead);
+      if (autoRead) {
+         enableTtl();
+      }
+   }
+
 
    private void act(Command command) {
       try {
@@ -762,11 +806,19 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       createInternalSession(info);
 
       // the actor can only be used after the WireFormat has been initialized 
with versioning
-      this.openWireActor = new Actor<>(executor, this::act);
+      this.openWireActor = new ThresholdActor<>(executor, this::act, 
actorThresholdBytes, OpenWireConnection::getSize, this::disableAutoRead, 
this::flushedActor);
 
       return context;
    }
 
+   private static int getSize(Command command) {
+      if (command instanceof ActiveMQMessage) {
+         return ((ActiveMQMessage) command).getSize();
+      } else {
+         return MINIMAL_SIZE_ESTIAMTE;
+      }
+   }
+
    private void createInternalSession(ConnectionInfo info) throws Exception {
       internalSession = 
server.createSession(UUIDGenerator.getInstance().generateStringUUID(), 
context.getUserName(), info.getPassword(), -1, this, true, false, false, false, 
null, null, true, operationContext, protocolManager.getPrefixes(), 
protocolManager.getSecurityDomain());
    }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 0ce2093..4aa54f4 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -41,7 +41,9 @@ import 
org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.api.core.client.TopologyMember;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import 
org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@@ -97,6 +99,8 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
 
    private boolean prefixPacketSize = true;
 
+   private int actorThresholdBytes = -1;
+
    private BrokerId brokerId;
    protected final ProducerId advisoryProducerId = new ProducerId();
 
@@ -236,6 +240,17 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
       }
    }
 
+   /*** if set, the OpenWire connection will bypass the tcpReadBuferSize and 
use this value instead.
+    *   This is by default -1, and it should not be used unless in extreme 
situations like on a slow storage. */
+   public int getActorThresholdBytes() {
+      return actorThresholdBytes;
+   }
+
+   public OpenWireProtocolManager setActorThresholdBytes(int 
actorThresholdBytes) {
+      this.actorThresholdBytes = actorThresholdBytes;
+      return this;
+   }
+
    public ScheduledExecutorService getScheduledPool() {
       return scheduledPool;
    }
@@ -293,10 +308,25 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
       return super.invokeInterceptors(this.outgoingInterceptors, command, 
connection);
    }
 
+   private int getActorThreadshold(Acceptor acceptorUsed) {
+      int actorThreshold = TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE;
+
+      if (acceptorUsed instanceof NettyAcceptor) {
+         actorThreshold = ((NettyAcceptor) 
acceptorUsed).getTcpReceiveBufferSize();
+      }
+
+      if (this.actorThresholdBytes > 0) {
+         // replace any previous value
+         actorThreshold = this.actorThresholdBytes;
+      }
+
+      return actorThreshold;
+   }
+
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection connection) {
       OpenWireFormat wf = (OpenWireFormat) wireFactory.createWireFormat();
-      OpenWireConnection owConn = new OpenWireConnection(connection, server, 
this, wf, server.getExecutorFactory().getExecutor());
+      OpenWireConnection owConn = new OpenWireConnection(connection, server, 
this, wf, server.getExecutorFactory().getExecutor(), 
getActorThreadshold(acceptorUsed));
       owConn.sendHandshake();
 
       //first we setup ttl to -1
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e3e0b0e..d9fcec0 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -89,8 +89,6 @@ public class AMQSession implements SessionCallback {
 
    private final OpenWireProtocolManager protocolManager;
 
-   private final Runnable enableAutoReadAndTtl;
-
    private final CoreMessageObjectPools coreMessageObjectPools;
 
    private String[] existingQueuesCache;
@@ -110,7 +108,6 @@ public class AMQSession implements SessionCallback {
       this.protocolManager = protocolManager;
       this.scheduledPool = protocolManager.getScheduledPool();
       this.protocolManagerWireFormat = protocolManager.wireFormat().copy();
-      this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
       this.existingQueuesCache = null;
       this.coreMessageObjectPools = coreMessageObjectPools;
    }
@@ -424,20 +421,16 @@ public class AMQSession implements SessionCallback {
          }
          final PagingStore store = 
server.getPagingManager().getPageStore(address);
 
-         this.connection.disableTtl();
          if (shouldBlockProducer) {
             sendShouldBlockProducer(producerInfo, messageSend, 
sendProducerAck, store, dest, count, coreMsg, address);
          } else {
-            //non-persistent messages goes here, by default we stop reading 
from
-            //transport
-            connection.getTransportConnection().setAutoRead(false);
             if (store != null) {
-               if (!store.checkMemory(enableAutoReadAndTtl)) {
-                  enableAutoReadAndTtl();
+               if (!store.checkMemory(true, this::restoreAutoRead, 
this::blockConnection)) {
+                  restoreAutoRead();
                   throw new ResourceAllocationException("Queue is full " + 
address);
                }
             } else {
-               enableAutoReadAndTtl.run();
+               restoreAutoRead();
             }
 
             getCoreSession().send(coreMsg, false, dest.isTemporary());
@@ -515,7 +508,7 @@ public class AMQSession implements SessionCallback {
          }
       };
       if (store != null) {
-         if (!store.checkMemory(false, task)) {
+         if (!store.checkMemory(false, task, null)) {
             this.connection.getContext().setDontSendReponse(false);
             connection.enableTtl();
             throw new ResourceAllocationException("Queue is full " + address);
@@ -525,9 +518,12 @@ public class AMQSession implements SessionCallback {
       }
    }
 
-   private void enableAutoReadAndTtl() {
-      connection.getTransportConnection().setAutoRead(true);
-      connection.enableTtl();
+   private void restoreAutoRead() {
+      connection.restoreAutoRead();
+   }
+
+   private void blockConnection() {
+      connection.blockConnection();
    }
 
    public String convertWildcard(ActiveMQDestination openWireDest) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 275805e..bbbf3a9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -132,7 +132,7 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
 
    boolean checkMemory(Runnable runnable);
 
-   boolean checkMemory(boolean runOnFailure, Runnable runnable);
+   boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable 
runWhenBlocking);
 
    boolean isFull();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 3133096..864a7ad 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -698,11 +698,11 @@ public class PagingStoreImpl implements PagingStore {
 
    @Override
    public boolean checkMemory(final Runnable runWhenAvailable) {
-      return checkMemory(true, runWhenAvailable);
+      return checkMemory(true, runWhenAvailable, null);
    }
 
    @Override
-   public boolean checkMemory(boolean runOnFailure, final Runnable 
runWhenAvailable) {
+   public boolean checkMemory(boolean runOnFailure, final Runnable 
runWhenAvailable, Runnable runWhenBlocking) {
 
       if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && 
(maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
          if (isFull()) {
@@ -712,7 +712,10 @@ public class PagingStoreImpl implements PagingStore {
             return false;
          }
       } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == 
AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
-         if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > 
maxSize || pagingManager.isGlobalFull()) {
+         if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= 
maxSize || pagingManager.isGlobalFull()) {
+            if (runWhenBlocking != null) {
+               runWhenBlocking.run();
+            }
 
             
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 9e19fc5..c3d62af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -396,6 +396,10 @@ public class NettyAcceptor extends AbstractAcceptor {
       }
    }
 
+   public int getTcpReceiveBufferSize() {
+      return tcpReceiveBufferSize;
+   }
+
    @Override
    public synchronized void start() throws Exception {
       if (channelClazz != null) {
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 09c8831..3d6fd5b 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -410,7 +410,7 @@ public class PersistMultiThreadTest extends 
ActiveMQTestBase {
       }
 
       @Override
-      public boolean checkMemory(boolean runOnFailure, Runnable runnable) {
+      public boolean checkMemory(boolean runOnFailure, Runnable runnable, 
Runnable ignoredRunnable) {
          return false;
       }
 
diff --git a/tests/smoke-tests/src/main/resources/servers/paging/broker.xml 
b/tests/smoke-tests/src/main/resources/servers/paging/broker.xml
index 46ccdec..bbe3b88 100644
--- a/tests/smoke-tests/src/main/resources/servers/paging/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/paging/broker.xml
@@ -103,7 +103,7 @@ under the License.
          <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
 
          <!-- Acceptor for every supported protocol -->
-         <acceptor 
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor 
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300;actorThresholdBytes=10000</acceptor>
 
          <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
          <acceptor 
name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;useKQueue=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java
new file mode 100644
index 0000000..62323de
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/paging/FloodServerWithAsyncSendTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FloodServerWithAsyncSendTest extends SmokeTestBase {
+
+   private static final Logger logger = 
Logger.getLogger(FloodServerWithAsyncSendTest.class);
+   public static final String SERVER_NAME_0 = "paging";
+
+   volatile boolean running = true;
+
+   AtomicInteger errors = new AtomicInteger(0);
+
+   @Before
+   public void before() throws Exception {
+      cleanupData(SERVER_NAME_0);
+      startServer(SERVER_NAME_0, 0, 30000);
+   }
+
+   @Test
+   public void testAsyncPagingOpenWire() throws Exception {
+      String protocol = "OPENWIRE";
+      internalTest(protocol);
+
+   }
+
+   ConnectionFactory newCF(String protocol) {
+      if (protocol.equalsIgnoreCase("OPENWIRE")) {
+         return CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616?jms.useAsyncSend=true");
+      } else {
+         Assert.fail("unsuported protocol");
+         return null;
+      }
+   }
+
+   private void internalTest(String protocol) throws Exception {
+      ExecutorService executorService = Executors.newFixedThreadPool(4);
+      try {
+         for (int i = 0; i < 2; i++) {
+            final String queueName = "queue" + i;
+            executorService.execute(() -> produce(protocol, queueName));
+            executorService.execute(() -> infiniteConsume(protocol, 
queueName));
+         }
+
+         Thread.sleep(10_000);
+
+         running = false;
+
+         executorService.shutdown();
+         Assert.assertTrue(executorService.awaitTermination(1, 
TimeUnit.MINUTES));
+
+         for (int i = 0; i < 2; i++) {
+            Assert.assertEquals("should have received at least a few 
messages", 20, consume(protocol, "queue" + i, 20));
+         }
+
+         ConnectionFactory factory = newCF("openwire");
+         Connection connection = factory.createConnection();
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue("queue3");
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         MessageProducer producer = session.createProducer(queue);
+
+         String random = RandomUtil.randomString();
+
+         producer.send(session.createTextMessage(random));
+         TextMessage message = (TextMessage) consumer.receive(1000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(random, message.getText());
+         connection.close();
+
+         Assert.assertEquals(0, errors.get());
+      } finally {
+         running = false;
+         executorService.shutdownNow(); // just to avoid thread leakage in 
case anything failed on the test
+      }
+
+   }
+
+
+   protected int infiniteConsume(String protocol, String queueName) {
+      ConnectionFactory factory = newCF(protocol);
+      Connection connection = null;
+      int rec = 0;
+      try {
+         connection = factory.createConnection();
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageConsumer consumer = session.createConsumer(queue);
+         while (running) {
+            if (consumer.receive(5000) != null) {
+               rec++;
+            } else {
+               break;
+            }
+            if (rec % 10 == 0) {
+               logger.info(queueName + " receive " + rec);
+            }
+         }
+
+         return rec;
+      } catch (Exception e) {
+         e.printStackTrace();
+         errors.incrementAndGet();
+         return -1;
+      } finally {
+         try {
+            connection.close();
+         } catch (Exception ignored) {
+         }
+      }
+   }
+
+
+
+   protected int consume(String protocol, String queueName, int maxCount) 
throws Exception {
+      ConnectionFactory factory = newCF(protocol);
+      Connection connection = null;
+      int rec = 0;
+      try {
+         connection = factory.createConnection();
+         connection.start();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageConsumer consumer = session.createConsumer(queue);
+         while (rec < maxCount) {
+            if (consumer.receive(5000) != null) {
+               rec++;
+            } else {
+               break;
+            }
+            if (rec % 10 == 0) {
+               logger.info(queueName + " receive " + rec);
+            }
+         }
+
+         return rec;
+      } finally {
+         try {
+            connection.close();
+         } catch (Exception ignored) {
+         }
+      }
+   }
+
+   protected void produce(String protocol, String queueName) {
+
+      int produced = 0;
+      ConnectionFactory factory = newCF(protocol);
+      Connection connection = null;
+      try {
+
+         connection = factory.createConnection();
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         String randomString;
+         {
+            StringBuffer buffer = new StringBuffer();
+            while (buffer.length() < 10000) {
+               buffer.append(RandomUtil.randomString());
+            }
+            randomString = buffer.toString();
+         }
+
+         while (running) {
+            if (++produced % 10 == 0) {
+               logger.info(queueName + " produced " + produced + " messages");
+            }
+            producer.send(session.createTextMessage(randomString));
+         }
+
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         errors.incrementAndGet();
+      } finally {
+         try {
+            connection.close();
+         } catch (Exception ignored) {
+         }
+      }
+   }
+
+}

Reply via email to