This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e5fd4a585 ARTEMIS-5002 AMQP producer not unblock if the disk space is 
freed
4e5fd4a585 is described below

commit 4e5fd4a58567688fabf5aec39fee3bcfaeb6de90
Author: Howard Gao <[email protected]>
AuthorDate: Sun Aug 18 22:24:46 2024 +0800

    ARTEMIS-5002 AMQP producer not unblock if the disk space is freed
---
 .../core/paging/impl/PagingManagerImpl.java        |   5 +
 .../artemis/core/paging/impl/PagingStoreImpl.java  |   3 +-
 .../core/server/files/FileStoreMonitor.java        |   7 ++
 .../tests/integration/amqp/GlobalDiskFullTest.java | 101 +++++++++++++++------
 4 files changed, 89 insertions(+), 27 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 25c4549780..0b76155d66 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.impl;
 
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -187,6 +188,10 @@ public final class PagingManagerImpl implements 
PagingManager {
       blockedStored.add(store);
    }
 
+   public Set<PagingStore> getBlockedSet() {
+      return new HashSet<>(blockedStored);
+   }
+
    @Override
    public void onChange() {
       reapplySettings();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f4f7cbbdea..432a0eb190 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1096,6 +1096,7 @@ public class PagingStoreImpl implements PagingStore {
          if (isFull()) {
             if (runOnFailure && runWhenAvailable != null) {
                addToBlockList(runWhenAvailable, blockedCallback);
+               pagingManager.addBlockedStore(this);
             }
             return false;
          }
@@ -1178,7 +1179,7 @@ public class PagingStoreImpl implements PagingStore {
          }
       }
 
-      return false;
+      return !blocking;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 273304f3bd..5e2f73edce 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -75,6 +75,13 @@ public class FileStoreMonitor extends 
ActiveMQScheduledComponent {
       }
    }
 
+   public FileStoreMonitor removeCallback(Callback callback) {
+      synchronized (monitorLock) {
+         callbackList.remove(callback);
+      }
+      return this;
+   }
+
    public FileStoreMonitor addStore(File file) throws IOException {
       synchronized (monitorLock) {
          // JDBC storage may return this as null, and we may need to ignore it
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
index bf260770e4..74e538d492 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
@@ -19,22 +19,56 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.lang.invoke.MethodHandles;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
+import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class GlobalDiskFullTest extends AmqpClientTestSupport {
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Parameter(index = 0)
+   public AddressFullMessagePolicy addressFullPolicy;
+
+   @Parameters(name = "addressFullPolicy={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP}, 
{AddressFullMessagePolicy.PAGE}
+      });
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAddressFullMessagePolicy(addressFullPolicy);
+      server.getConfiguration().addAddressSetting(getQueueName(), 
addressSettings);
+   }
 
    @Override
    protected void addConfiguration(ActiveMQServer server) {
@@ -42,15 +76,32 @@ public class GlobalDiskFullTest extends 
AmqpClientTestSupport {
       serverConfig.setDiskScanPeriod(100);
    }
 
-   @Test
-   public void testProducerOnDiskFull() throws Exception {
-      FileStoreMonitor monitor = 
((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
-      final CountDownLatch latch = new CountDownLatch(1);
-      monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
+   protected void waitMonitor(FileStoreMonitor monitor) throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+
+      FileStoreMonitor.Callback callback = (a, b, c, d) -> {
          latch.countDown();
-      });
+      };
 
-      assertTrue(latch.await(1, TimeUnit.MINUTES));
+      monitor.addCallback(callback);
+
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+      monitor.removeCallback(callback);
+   }
+
+
+   @TestTemplate
+   public void testProducerOnDiskFull() throws Exception {
+
+      FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor();
+
+      waitMonitor(monitor);
+
+      //make it full
+      monitor.setMaxUsage(0.0);
+
+      waitMonitor(monitor);
 
       AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT));
       AmqpConnection connection = addConnection(client.connect());
@@ -60,59 +111,57 @@ public class GlobalDiskFullTest extends 
AmqpClientTestSupport {
          AmqpSender sender = session.createSender(getQueueName());
          byte[] payload = new byte[1000];
 
-
          AmqpSender anonSender = session.createSender();
 
          CountDownLatch sentWithName = new CountDownLatch(1);
          CountDownLatch sentAnon = new CountDownLatch(1);
 
-         Thread threadWithName = new Thread(() -> {
+         ExecutorService pool = Executors.newCachedThreadPool();
+         runAfter(pool::shutdownNow);
 
+         pool.execute(() -> {
             try {
                final AmqpMessage message = new AmqpMessage();
                message.setBytes(payload);
                sender.setSendTimeout(-1);
                sender.send(message);
             } catch (Exception e) {
-               e.printStackTrace();
+               logger.warn("Caught exception while sending", e);
             } finally {
                sentWithName.countDown();
             }
          });
-
-         threadWithName.start();
-
-
-         Thread threadWithAnon = new Thread(() -> {
+         pool.execute(()-> {
             try {
                final AmqpMessage message = new AmqpMessage();
                message.setBytes(payload);
                anonSender.setSendTimeout(-1);
                message.setAddress(getQueueName());
                anonSender.send(message);
-            } catch (Exception e) {
-               e.printStackTrace();
-            } finally {
                sentAnon.countDown();
+            } catch (Exception e) {
+               logger.warn("Caught exception while sending", e);
             }
          });
 
-         threadWithAnon.start();
+         PagingManagerImpl pagingManager = (PagingManagerImpl) 
server.getPagingManager();
+         Wait.assertTrue(() -> pagingManager.getBlockedSet().size() > 0, 5000);
 
-         assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread 
sender should be blocked");
-         assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread 
sender anonymous should be blocked");
+         assertFalse(sentWithName.await(100, TimeUnit.MILLISECONDS), "Thread 
sender should be blocked");
+         assertFalse(sentAnon.await(100, TimeUnit.MILLISECONDS), "Thread 
sender anonymous should be blocked");
 
+         // unblock
          monitor.setMaxUsage(100.0);
 
+         waitMonitor(monitor);
+
          assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender 
should be released");
          assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender 
anonymous should be released");
 
-         threadWithName.join(TimeUnit.SECONDS.toMillis(30));
-         threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
-         assertFalse(threadWithName.isAlive());
-         assertFalse(threadWithAnon.isAlive());
+         Wait.assertEquals(0, () -> pagingManager.getBlockedSet().size(), 
5000);
       } finally {
          connection.close();
       }
    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to