gemmellr commented on a change in pull request #3863:
URL: https://github.com/apache/activemq-artemis/pull/3863#discussion_r758286287
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable
runWhenAvailable) {
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable
runWhenAvailable) {
+ if (blocked) {
+ if (runWhenAvailable != null) {
+
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+ }
+ ActiveMQServerLogger.LOGGER.blockingMessageProduction(address,
sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
Review comment:
This could be spammy, logging for each call. The existing usage of this
logger call is guarded by the 'blocking' boolean to have it act as more a log
that the state has toggled. The equivalent of that would seem to be doing this
logger call in the blocked() method (and checking the boolean) rather than here.
Perhaps it should have its own logger call given that its matching 'unblock'
is really separate from the existing 'blocking' mechanism.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -1153,6 +1161,31 @@ public boolean isFull() {
return maxSize > 0 && getAddressSize() > maxSize ||
pagingManager.isGlobalFull();
}
+
+ @Override
+ public int getAddressLimitPercent() {
+ final long currentUsage = getAddressSize();
+ if (currentUsage != 0) {
+ if (maxSize > 0) {
+ return Math.toIntExact((currentUsage / maxSize) * 100);
+ } else if (pagingManager.isUsingGlobalSize()) {
+ return Math.toIntExact((currentUsage /
pagingManager.getGlobalSize()) * 100);
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public void block() {
+ blocked = true;
+ }
+
+ @Override
+ public void unBlock() {
Review comment:
No capital needed in unblock.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated()
throws Exception {
}
}
+ @Test(timeout = 60000)
+ public void
testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked()
throws Exception {
Review comment:
```suggestion
public void
testCreditIsNotGivenOnLinkCreationWhileBlockedAndIsGivenOnceThenUnblocked()
throws Exception {
```
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void
testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
}
+ @Test(timeout = 10000)
+ public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception
{
+ Connection connection = createConnection(new
URI(singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true);
+ final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination d = session.createQueue(getQueueName());
+ final MessageProducer p = session.createProducer(d);
+
+ final CountDownLatch running = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ AddressControl addressControl =
ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()),
mBeanServer);
+
+ assertTrue("blocked ok", addressControl.block());
+
+ // one credit
+ p.send(session.createBytesMessage());
+
+ // this send will block, no credit
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ running.countDown();
+ p.send(session.createBytesMessage());
+ } catch (JMSException ignored) {
+ } finally {
+ done.countDown();
+ }
+ }
+ }).start();
+
+ assertTrue(running.await(5, TimeUnit.SECONDS));
+
+ assertFalse(done.await(1, TimeUnit.SECONDS));
Review comment:
I dont think it needs as long as this. Especially if you verify there is
no credit left. The message will never actually even leave the client until
some arrives, waiting an entire second after knowing the thread started and is
about to call send next doesn't seem necessary in the typical case.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -126,6 +126,8 @@
private volatile boolean blocking = false;
+ private volatile boolean blocked = false;
Review comment:
I would give this (and possibly also the existing 'blocking' variable) a
different name so its more distinguished, e.g manuallyBlocked. Its not at all
clear with blocked+blocking that they are really quite unrelated, and that
setting the store 'blocked' does not mean it is 'blocking'. Which leads to the
next comment.
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
##########
@@ -695,6 +697,14 @@ public boolean checkMemory(final Runnable
runWhenAvailable) {
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable
runWhenAvailable) {
+ if (blocked) {
+ if (runWhenAvailable != null) {
+
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
Review comment:
This seems questionable, the onMemoryFreedRunnables queue is already
used by the existing disk usage based 'blocking' mechanism. Using it for this
new but separate 'blocked' mechanism as well means there are likely states that
the other [un]'blocking' mechanism could actually run these actions even though
the store was and is still 'blocked'.
(Perhaps also vice-versa when the unBlock() is called)
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated()
throws Exception {
}
}
+ @Test(timeout = 60000)
+ public void
testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked()
throws Exception {
+ AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AddressControl addressControl =
ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()),
mBeanServer);
+ addressControl.block();
+ AmqpSession session = connection.createSession();
+ final AmqpSender sender = session.createSender(getQueueName());
+ assertEquals("Should get 0 credit", 0,
sender.getSender().getCredit());
+
+ addressControl.unBlock();
+ assertTrue("Should now get issued one credit", Wait.waitFor(new
Wait.Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return 1 == sender.getSender().getCredit();
+ }
+ }));
Review comment:
The default check interval is 100ms, I would set a smaller one for this
(and smaller limit, default 30sec), as its likely not to happen on the first
check, but shouldnt take anything like that long to then arrive.
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -130,6 +162,51 @@ public void
testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
}
+ @Test(timeout = 10000)
+ public void testSendHangsWhenBlockedAndNotAfterUnBlocked() throws Exception
{
Review comment:
```suggestion
public void testSendBlocksWhenAddressBlockedAndCompletesAfterUnblocked()
throws Exception {
```
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java
##########
@@ -201,4 +201,16 @@ public TargetResult getTarget(String key) {
return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}
+
+ public void setLocalTargetFilter(String regExp) {
+ if (regExp == null || regExp.isBlank()) {
Review comment:
Advanced Java 11 features! :D ;)
##########
File path:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java
##########
@@ -85,6 +89,34 @@ public void testCreditsAreAllocatedOnceOnLinkCreated()
throws Exception {
}
}
+ @Test(timeout = 60000)
+ public void
testCreditsAreNotAllocatedOnceOnLinkCreatedWhileBlockedAndWhenUnBlocked()
throws Exception {
+ AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AddressControl addressControl =
ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()),
mBeanServer);
+ addressControl.block();
+ AmqpSession session = connection.createSession();
+ final AmqpSender sender = session.createSender(getQueueName());
+ assertEquals("Should get 0 credit", 0,
sender.getSender().getCredit());
Review comment:
This test is susceptible to the inherent race of checking credit, as
credit is granted separately and so may not have arrived before the create
method returned and you check it. Should probably use a very small wait (e.g
5-10ms) here to help give more confidence in this state and a small window it
could fail if some should arrive unexpectedly before checking it, and
before/regardless of the unblock call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]