This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new f7485a63c6 ARTEMIS-5536 Improving reliabiilty of openwire flow control
f7485a63c6 is described below
commit f7485a63c61ab88deed55807748c7c815a910e0c
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Jun 9 17:07:22 2025 -0400
ARTEMIS-5536 Improving reliabiilty of openwire flow control
- AMQConsumer used to discount credits ahead of taking them on rollback,
that would lead to negative values
- AMQConsumer.close() was blocking an operation
- fixing other branches for flow control
---
.../artemis/utils/actors/ProcessorBase.java | 23 +++-
.../artemis/utils/actors/PauseActorTest.java | 96 ++++++++++++++
.../core/protocol/openwire/OpenWireConnection.java | 8 ++
.../core/protocol/openwire/amq/AMQConsumer.java | 95 +++++++++++---
.../protocol/openwire/amq/AMQConsumerTest.java | 2 +-
.../openwire/amq/JMSPullConsumerTest.java | 24 ++--
.../openwire/amq/RedeliveryPolicyTest.java | 2 +-
.../openwire/amq/SimpleConsumerLargeCountTest.java | 140 +++++++++++++++++++++
.../openwire/amq/ValidateAddressSizeTest.java | 2 +-
9 files changed, 358 insertions(+), 34 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 11d282075b..a0869a3bba 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -33,6 +33,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
public static final int STATE_NOT_RUNNING = 0;
public static final int STATE_RUNNING = 1;
public static final int STATE_FORCED_SHUTDOWN = 2;
+ public static final int STATE_PAUSED = 3;
protected final Queue<T> tasks = new ConcurrentLinkedQueue<>();
@@ -47,7 +48,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
@SuppressWarnings("unused")
private volatile int state = STATE_NOT_RUNNING;
// Request of forced shutdown
- private volatile boolean requestedForcedShutdown = false;
+ private volatile boolean pausedProcessing = false;
// Request of educated shutdown:
protected volatile boolean requestedShutdown = false;
// Request to yield to another thread
@@ -63,8 +64,8 @@ public abstract class ProcessorBase<T> extends HandlerBase {
try {
T task;
//while the queue is not empty we process in order:
- //if requestedForcedShutdown==true than no new tasks will be
drained from the tasks q.
- while (!yielded && !requestedForcedShutdown && (task =
tasks.poll()) != null) {
+ //if pausedProcessing==true than no new tasks will be drained
from the tasks q.
+ while (!yielded && !pausedProcessing && (task = tasks.poll())
!= null) {
doTask(task);
}
} finally {
@@ -82,7 +83,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
//but poll() has returned null, so a submitting thread will believe
that it does not need re-execute.
//this check fixes the issue
}
- while (!tasks.isEmpty() && !requestedShutdown && !yielded);
+ while (!tasks.isEmpty() && !requestedShutdown && !yielded &&
!pausedProcessing);
if (yielded) {
yielded = false;
@@ -90,6 +91,18 @@ public abstract class ProcessorBase<T> extends HandlerBase {
}
}
+ public void pauseProcessing() {
+ pausedProcessing = true;
+ stateUpdater.set(this, STATE_PAUSED);
+ }
+
+ public void resumeProcessing() {
+ pausedProcessing = false;
+ if (stateUpdater.compareAndSet(this, STATE_PAUSED, STATE_NOT_RUNNING)) {
+ delegate.execute(task);
+ }
+ }
+
/**
* It will shutdown and wait 30 seconds for timeout.
*/
@@ -121,7 +134,7 @@ public abstract class ProcessorBase<T> extends HandlerBase {
*/
public int shutdownNow(Consumer<? super T> onPendingItem, int timeout,
TimeUnit unit) {
//alert anyone that has been requested (at least) an immediate shutdown
- requestedForcedShutdown = true;
+ pausedProcessing = true;
requestedShutdown = true;
yielded = false;
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/PauseActorTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/PauseActorTest.java
new file mode 100644
index 0000000000..a2d8631179
--- /dev/null
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/PauseActorTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PauseActorTest {
+
+ Actor<Integer> actor;
+
+ HashSet<Integer> receivedValues = new HashSet<>();
+ CountDownLatch paused = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(1);
+
+ int firstPauseElements = 50;
+ int firstSend = 80;
+ int totalElements = 100;
+
+ public void doInteger(Integer received) {
+ receivedValues.add(received);
+ if (received.equals(Integer.valueOf(firstPauseElements - 1))) {
+ actor.pauseProcessing();
+ paused.countDown();
+ } else if (received.equals(Integer.valueOf(totalElements - 1))) {
+ done.countDown();
+ }
+ }
+
+ @Test
+ public void testPauseActor() throws Exception {
+ final ExecutorService executorService =
Executors.newSingleThreadExecutor();
+ try {
+ actor = new Actor<>(executorService, this::doInteger);
+
+ for (int i = 0; i < firstSend; i++) {
+ actor.act(i);
+ }
+
+ actor.flush();
+ assertTrue(paused.await(10, TimeUnit.SECONDS));
+
+ Wait.assertEquals(firstPauseElements, () -> receivedValues.size());
+
+ for (int i = firstSend; i < totalElements; i++) {
+ actor.act(i);
+ }
+
+ actor.flush();
+ // it should be unchanged even after adding more elements as the
actor is paused
+ Wait.assertEquals(firstPauseElements, () -> receivedValues.size());
+
+ for (int i = 0; i < totalElements; i++) {
+ if (i < firstPauseElements) {
+ assertTrue(receivedValues.contains(i));
+ } else {
+ assertFalse(receivedValues.contains(i));
+ }
+ }
+
+ actor.resumeProcessing();
+ actor.flush();
+ assertTrue(done.await(10, TimeUnit.SECONDS));
+ Wait.assertEquals(totalElements, () -> receivedValues.size());
+
+ for (int i = 0; i < totalElements; i++) {
+ assertTrue(receivedValues.contains(i));
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ba493f9295..cf405e4b19 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -208,6 +208,14 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
private String validatedUser = null;
+ public void block() {
+ openWireActor.pauseProcessing();
+ }
+
+ public void unblock() {
+ openWireActor.resumeProcessing();
+ }
+
public OpenWireConnection(Connection connection,
ActiveMQServer server,
OpenWireProtocolManager openWireProtocolManager,
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 4ac10382b5..ea8c4ecf04 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -38,6 +38,8 @@ import
org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants;
import
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -48,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SelectorTranslator;
@@ -263,9 +266,10 @@ public class AMQConsumer {
return info.getConsumerId();
}
- public void acquireCredit(int n, boolean delivered) {
+ public void replenishCredit(int n, boolean delivered) {
if (messagePullHandler.get() != null) {
- //don't acquire any credits when the pull handler controls it!!
+ logger.debug("replenishCredit of {} ignored, as there's no
messagePullHandler", n);
+ //don't replenish any credits when the pull handler controls it!!
return;
}
@@ -274,6 +278,7 @@ public class AMQConsumer {
} else if (deliveredAcksCreditExtension > 0) {
if (deliveredAcksCreditExtension < n) {
n -= deliveredAcksCreditExtension;
+ logger.debug("Requested more credits than the limit, maximizing
the request as {}", n);
deliveredAcksCreditExtension = 0;
} else {
deliveredAcksCreditExtension -= n;
@@ -282,6 +287,9 @@ public class AMQConsumer {
}
int oldwindow = currentWindow.getAndAdd(n);
+ if (logger.isDebugEnabled()) {
+ logger.debug("replenished {} credits, current after replenish = {},
old ={}", n, currentWindow.get(), oldwindow);
+ }
boolean promptDelivery = oldwindow < prefetchSize;
@@ -307,7 +315,10 @@ public class AMQConsumer {
reference.setProtocolData(MessageId.class,
dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
// Prevent races with other updates that can lead to credit going
negative and starving consumers.
- currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+ currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Decremented credit, current={}",
currentWindow.get());
+ }
return size;
} catch (Throwable t) {
logger.warn("Error during message dispatch", t);
@@ -315,6 +326,17 @@ public class AMQConsumer {
}
}
+ static int decrementWindow(int value) {
+ if (value > 0) {
+ return value - 1;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("preventing the credit window from going negative,
keeping value at {}", value, new Exception("trace"));
+ }
+ return value;
+ }
+ }
+
public void handleDeliverNullDispatch() {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(getId());
@@ -334,9 +356,15 @@ public class AMQConsumer {
return;
}
+ logger.debug("Acking {}", ack);
+
final int ackMessageCount = ack.getMessageCount();
+
+ // this is for deliveredACK on OpenWire. Meaning the message is still
being held by the client.
+ // it's just to request more credits
if (ack.isDeliveredAck()) {
- acquireCredit(ackMessageCount, true);
+ logger.debug("replinishCredit credit on {} credits, for
DELIVERED_ACK_TYPE", ackMessageCount);
+ replenishCredit(ackMessageCount, true);
// our work is done
return;
}
@@ -350,12 +378,13 @@ public class AMQConsumer {
if (!ackList.isEmpty() || !removeReferences ||
serverConsumer.getQueue().isTemporary()) {
- // valid match in delivered or browsing or temp - deal with credit
- acquireCredit(ackMessageCount, false);
-
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.getQueue().expire(ref, serverConsumer, true);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Acquiring 1 credit for EXPIRED_ACK_TYPE");
+ }
+ replenishCredit(1, false);
}
} else if (removeReferences) {
@@ -367,6 +396,14 @@ public class AMQConsumer {
} else {
transaction = originalTX;
}
+ transaction.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ // these credits will only be requested if the commit
actually succeeds
+ logger.debug("replenishing {} credits after committed",
ackMessageCount);
+ replenishCredit(ackMessageCount, false);
+ }
+ });
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
@@ -390,6 +427,9 @@ public class AMQConsumer {
if (originalTX == null) {
transaction.commit(true);
}
+ } else {
+ logger.debug("Replenishing {} credits without any transaction
(browsing perhaps)", ackMessageCount);
+ replenishCredit(ackMessageCount, false);
}
}
}
@@ -413,6 +453,9 @@ public class AMQConsumer {
public void processMessagePull(MessagePull messagePull) throws Exception {
currentWindow.incrementAndGet();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Incremented credit for processMessagePull, current =
{}", currentWindow.get());
+ }
MessagePullHandler pullHandler = messagePullHandler.get();
if (pullHandler != null) {
pullHandler.nextSequence(messagePullSequence++,
messagePull.getTimeout());
@@ -424,10 +467,22 @@ public class AMQConsumer {
if (delayedDispatchPrompter != null) {
delayedDispatchPrompter.cancel(false);
}
- if (info.getPrefetchSize() > 1) {
- // because response required is false on a RemoveConsumerCommand, a
new consumer could miss canceled prefetched messages
- // we await the operation context completion before handling a
subsequent command
- session.getCoreSession().getSessionContext().waitCompletion();
+ OpenWireConnection openWireConnection = session.getConnection();
+ if (openWireConnection != null) {
+ openWireConnection.block();
+ session.getCoreSession().getSessionContext().executeOnCompletion(new
IOCallback() {
+ @Override
+ public void done() {
+ session.getConnection().unblock();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ // because response required is false on a
RemoveConsumerCommand, a new consumer could miss canceled prefetched messages
+ // we await the operation context completion before handling a
subsequent command
+ session.getConnection().unblock();
+ }
+ });
}
}
@@ -438,6 +493,9 @@ public class AMQConsumer {
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
this.currentWindow.set(prefetchSize);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Updating credits with a new prefetchSize",
prefetchSize, new Exception("trace"));
+ }
this.info.setPrefetchSize(prefetchSize);
if (this.prefetchSize == 0) {
messagePullHandler.compareAndSet(null, new MessagePullHandler());
@@ -493,7 +551,10 @@ public class AMQConsumer {
if (next >= 0) {
if (timeout <= 0) {
// Prevent races with other updates that can lead to credit
going negative and starving consumers.
- currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+ currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+ if (logger.isDebugEnabled()) {
+ logger.debug("UpdateCredit down on FORCED_DELIVERY,
current after decrement={}", currentWindow.get());
+ }
latch.countDown();
} else {
messagePullFuture = scheduledPool.schedule(() -> {
@@ -502,7 +563,10 @@ public class AMQConsumer {
// can race with an actual message arriving so we must
ensure we don't reduce
// credit below zero as we want credit to always be
zero or on active pull it
// should be one (greater than one indicates a broken
client implementation).
- currentWindow.updateAndGet(i -> i > 0 ? i - 1 : i);
+
currentWindow.updateAndGet(AMQConsumer::decrementWindow);
+ if (logger.isDebugEnabled()) {
+ logger.debug("UpdateCredit down on FORCED_DELIVERY,
current after decrement={}", currentWindow.get());
+ }
handleDeliverNullDispatch();
}
}, timeout, TimeUnit.MILLISECONDS);
@@ -528,7 +592,10 @@ public class AMQConsumer {
}
public void addRolledback(MessageReference messageReference) {
- currentWindow.decrementAndGet();
+ // Note:
+ // We used to call currentWindow.decrement here.
+ // this is not needed any more as we won't be taking credits unless a
commit happened.
+
getRolledbackMessageRefsOrCreate().add(messageReference);
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
index c302f98aff..152dad95a1 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
@@ -121,7 +121,7 @@ public class AMQConsumerTest {
assertFalse(consumer.hasCredits());
- consumer.acquireCredit(1, true);
+ consumer.replenishCredit(1, true);
assertTrue(consumer.hasCredits());
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
index 318af3233f..279b961315 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSPullConsumerTest.java
@@ -61,19 +61,19 @@ public class JMSPullConsumerTest extends BasicOpenWireTest {
destination = createDestination(session, destinationType);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(destination);
- assertNull(consumer.receive(100));
- assertNull(consumer.receive(100));
- assertNull(consumer.receive(100));
+ assertNull(consumer.receive(10));
+ assertNull(consumer.receive(10));
+ assertNull(consumer.receive(10));
sendMessages(session, destination, 3);
- assertNotNull(consumer.receive(100));
+ assertNotNull(consumer.receive(10));
ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer)
session.createConsumer(destination);
ActiveMQMessageConsumer consumer3 = (ActiveMQMessageConsumer)
session.createConsumer(destination);
- assertNotNull(consumer2.receive(100));
- assertNotNull(consumer3.receive(100));
+ assertNotNull(consumer2.receive(10));
+ assertNotNull(consumer3.receive(10));
}
@TestTemplate
@@ -86,17 +86,17 @@ public class JMSPullConsumerTest extends BasicOpenWireTest {
destination = createDestination(session, destinationType);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(destination);
- assertNull(consumer.receive(100));
- assertNull(consumer.receive(100));
- assertNull(consumer.receive(100));
+ assertNull(consumer.receive(10));
+ assertNull(consumer.receive(10));
+ assertNull(consumer.receive(10));
sendMessages(session, destination, 3);
ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer)
session.createConsumer(destination);
- assertNotNull(consumer2.receive(100));
- assertNotNull(consumer2.receive(100));
- assertNotNull(consumer2.receive(100));
+ assertNotNull(consumer2.receive(1000));
+ assertNotNull(consumer2.receive(1000));
+ assertNotNull(consumer2.receive(1000));
assertNull(consumer.receiveNoWait());
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 951f66960c..02e146f912 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -474,7 +474,7 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest
{
assertNotNull(m, "null@:" + i);
session.commit();
- assertTrue(queueControl.getDeliveringCount() <= prefetchSize + 1,
"deliveryCount: " + queueControl.getDeliveringCount() + " @:" + i);
+ assertTrue(queueControl.getDeliveringCount() <= prefetchSize,
"deliveringCount: " + queueControl.getDeliveringCount() + " @:" + i);
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/SimpleConsumerLargeCountTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/SimpleConsumerLargeCountTest.java
new file mode 100644
index 0000000000..f88b6694b0
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/SimpleConsumerLargeCountTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This test is to make sure a consumer on a transaction session, would
receive all messages even
+ * when the number of messages is bigger than prefetch.
+ * <p>
+ * Basically I am making sure flow control will still issue credits even
thought the messages are not removed with a (ACK mode = remove)
+ */
+public class SimpleConsumerLargeCountTest extends BasicOpenWireTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Test
+ public void testTopicReceiverFlowControlled() throws Exception {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ ActiveMQTopic destination = (ActiveMQTopic)
this.createDestination(session, ActiveMQDestination.TOPIC_TYPE);
+ connection.start();
+
+ // trying to starve the server if there's anything blocking on a create
resources
+ int numberOfSubscriptions = 50;
+ // more messages than prefetch
+ int numberOfMessages = 500;
+ int rollbackAt = 50;
+
+ ExecutorService service =
Executors.newFixedThreadPool(numberOfSubscriptions);
+ runAfter(service::shutdownNow);
+ AtomicInteger errors = new AtomicInteger(0);
+
+ CountDownLatch done = new CountDownLatch(numberOfSubscriptions);
+
+ CyclicBarrier startFlag = new CyclicBarrier(numberOfSubscriptions + 1);
+ for (int dest = 0; dest < numberOfSubscriptions; dest++) {
+ final int finalDest = dest;
+ service.execute(() -> {
+ Connection connConsumer = null;
+ try {
+ connConsumer = factory.createConnection();
+ connConsumer.setClientID("client" + finalDest);
+ Session sessConsumer = connConsumer.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer =
sessConsumer.createDurableSubscriber(destination, "cons" + finalDest);
+ connConsumer.start();
+ startFlag.await(10, TimeUnit.SECONDS);
+
+ for (int rollbackNumber = 0; rollbackNumber < 2;
rollbackNumber++) {
+ for (int i = 0; i < rollbackAt; i++) {
+ TextMessage message = (TextMessage)
consumer.receive(5000);
+ logger.debug("Received {}, dest={}", i, finalDest);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("i"));
+ }
+ // this is just to make the test more challenging. rollback
destinations, make sure they won't block
+ // and flow control would stlil be okay on sending more
messages
+ sessConsumer.rollback();
+ }
+
+ consumer.close();
+ consumer = sessConsumer.createDurableSubscriber(destination,
"cons" + finalDest);
+
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ logger.debug("Received {}, dest={}", i, finalDest);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("i"));
+ }
+ sessConsumer.commit();
+ assertNull(consumer.receiveNoWait());
+
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ errors.incrementAndGet();
+ } finally {
+ if (connConsumer != null) {
+ try {
+ connConsumer.close();
+ } catch (Throwable ignored) {
+ }
+ }
+ done.countDown();
+ }
+ });
+ }
+
+ startFlag.await(10, TimeUnit.SECONDS);
+
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = session.createTextMessage("Message: " + i);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ session.commit();
+
+ assertTrue(done.await(1, TimeUnit.MINUTES));
+ assertEquals(0, errors.get());
+ service.shutdownNow();
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ }
+}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
index 6e1b096f00..a4ca962d76 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ValidateAddressSizeTest.java
@@ -103,7 +103,7 @@ public class ValidateAddressSizeTest extends
BasicOpenWireTest {
String messageBody = "a".repeat(messageBodySize);
String largeMessageBody = "a".repeat(largeMessageSize);
- int consumers = 10;
+ int consumers = 200;
int numberOfMessages = 10;
int numberOfLargeMessages = 5;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact