This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4e5fd4a585 ARTEMIS-5002 AMQP producer not unblock if the disk space is
freed
4e5fd4a585 is described below
commit 4e5fd4a58567688fabf5aec39fee3bcfaeb6de90
Author: Howard Gao <[email protected]>
AuthorDate: Sun Aug 18 22:24:46 2024 +0800
ARTEMIS-5002 AMQP producer not unblock if the disk space is freed
---
.../core/paging/impl/PagingManagerImpl.java | 5 +
.../artemis/core/paging/impl/PagingStoreImpl.java | 3 +-
.../core/server/files/FileStoreMonitor.java | 7 ++
.../tests/integration/amqp/GlobalDiskFullTest.java | 101 +++++++++++++++------
4 files changed, 89 insertions(+), 27 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 25c4549780..0b76155d66 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -187,6 +188,10 @@ public final class PagingManagerImpl implements
PagingManager {
blockedStored.add(store);
}
+ public Set<PagingStore> getBlockedSet() {
+ return new HashSet<>(blockedStored);
+ }
+
@Override
public void onChange() {
reapplySettings();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f4f7cbbdea..432a0eb190 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1096,6 +1096,7 @@ public class PagingStoreImpl implements PagingStore {
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
addToBlockList(runWhenAvailable, blockedCallback);
+ pagingManager.addBlockedStore(this);
}
return false;
}
@@ -1178,7 +1179,7 @@ public class PagingStoreImpl implements PagingStore {
}
}
- return false;
+ return !blocking;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
index 273304f3bd..5e2f73edce 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java
@@ -75,6 +75,13 @@ public class FileStoreMonitor extends
ActiveMQScheduledComponent {
}
}
+ public FileStoreMonitor removeCallback(Callback callback) {
+ synchronized (monitorLock) {
+ callbackList.remove(callback);
+ }
+ return this;
+ }
+
public FileStoreMonitor addStore(File file) throws IOException {
synchronized (monitorLock) {
// JDBC storage may return this as null, and we may need to ignore it
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
index bf260770e4..74e538d492 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java
@@ -19,22 +19,56 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.lang.invoke.MethodHandles;
import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
+import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
+import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+@ExtendWith(ParameterizedTestExtension.class)
public class GlobalDiskFullTest extends AmqpClientTestSupport {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Parameter(index = 0)
+ public AddressFullMessagePolicy addressFullPolicy;
+
+ @Parameters(name = "addressFullPolicy={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP},
{AddressFullMessagePolicy.PAGE}
+ });
+ }
+
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAddressFullMessagePolicy(addressFullPolicy);
+ server.getConfiguration().addAddressSetting(getQueueName(),
addressSettings);
+ }
@Override
protected void addConfiguration(ActiveMQServer server) {
@@ -42,15 +76,32 @@ public class GlobalDiskFullTest extends
AmqpClientTestSupport {
serverConfig.setDiskScanPeriod(100);
}
- @Test
- public void testProducerOnDiskFull() throws Exception {
- FileStoreMonitor monitor =
((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);
- final CountDownLatch latch = new CountDownLatch(1);
- monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
+ protected void waitMonitor(FileStoreMonitor monitor) throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ FileStoreMonitor.Callback callback = (a, b, c, d) -> {
latch.countDown();
- });
+ };
- assertTrue(latch.await(1, TimeUnit.MINUTES));
+ monitor.addCallback(callback);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+ monitor.removeCallback(callback);
+ }
+
+
+ @TestTemplate
+ public void testProducerOnDiskFull() throws Exception {
+
+ FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor();
+
+ waitMonitor(monitor);
+
+ //make it full
+ monitor.setMaxUsage(0.0);
+
+ waitMonitor(monitor);
AmqpClient client = createAmqpClient(new URI("tcp://localhost:" +
AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());
@@ -60,59 +111,57 @@ public class GlobalDiskFullTest extends
AmqpClientTestSupport {
AmqpSender sender = session.createSender(getQueueName());
byte[] payload = new byte[1000];
-
AmqpSender anonSender = session.createSender();
CountDownLatch sentWithName = new CountDownLatch(1);
CountDownLatch sentAnon = new CountDownLatch(1);
- Thread threadWithName = new Thread(() -> {
+ ExecutorService pool = Executors.newCachedThreadPool();
+ runAfter(pool::shutdownNow);
+ pool.execute(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.setSendTimeout(-1);
sender.send(message);
} catch (Exception e) {
- e.printStackTrace();
+ logger.warn("Caught exception while sending", e);
} finally {
sentWithName.countDown();
}
});
-
- threadWithName.start();
-
-
- Thread threadWithAnon = new Thread(() -> {
+ pool.execute(()-> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
anonSender.setSendTimeout(-1);
message.setAddress(getQueueName());
anonSender.send(message);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
sentAnon.countDown();
+ } catch (Exception e) {
+ logger.warn("Caught exception while sending", e);
}
});
- threadWithAnon.start();
+ PagingManagerImpl pagingManager = (PagingManagerImpl)
server.getPagingManager();
+ Wait.assertTrue(() -> pagingManager.getBlockedSet().size() > 0, 5000);
- assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread
sender should be blocked");
- assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread
sender anonymous should be blocked");
+ assertFalse(sentWithName.await(100, TimeUnit.MILLISECONDS), "Thread
sender should be blocked");
+ assertFalse(sentAnon.await(100, TimeUnit.MILLISECONDS), "Thread
sender anonymous should be blocked");
+ // unblock
monitor.setMaxUsage(100.0);
+ waitMonitor(monitor);
+
assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender
should be released");
assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender
anonymous should be released");
- threadWithName.join(TimeUnit.SECONDS.toMillis(30));
- threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
- assertFalse(threadWithName.isAlive());
- assertFalse(threadWithAnon.isAlive());
+ Wait.assertEquals(0, () -> pagingManager.getBlockedSet().size(),
5000);
} finally {
connection.close();
}
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact