This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 78c0792 ARTEMIS-3065 AMQP Anonymous producer would eventually block
new 697886a This closes #3406
78c0792 is described below
commit 78c0792989137a69c31f7310c948a475be2bad7d
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Jan 12 14:52:33 2021 -0500
ARTEMIS-3065 AMQP Anonymous producer would eventually block
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 16 ++-
.../amqp/logger/ActiveMQAMQPProtocolLogger.java | 5 +
.../amqp/proton/ProtonServerReceiverContext.java | 28 +++-
.../artemis/core/paging/PagingManager.java | 9 ++
.../core/paging/impl/PagingManagerImpl.java | 12 ++
.../paging/AnonymousProducerPageTest.java | 159 +++++++++++++++++++++
6 files changed, 226 insertions(+), 3 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 9d165d7..eb62855 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -138,6 +139,10 @@ public class AMQPSessionCallback implements
SessionCallback {
return coreMessageObjectPools;
}
+ public ProtonProtocolManager getProtocolManager() {
+ return manager;
+ }
+
@Override
public boolean isWritable(ReadyListener callback, Object protocolContext) {
ProtonServerSenderContext senderContext = (ProtonServerSenderContext)
protocolContext;
@@ -603,10 +608,17 @@ public class AMQPSessionCallback implements
SessionCallback {
public void flow(final SimpleString address,
Runnable runnable) {
try {
- PagingManager pagingManager = manager.getServer().getPagingManager();
if (address == null) {
- pagingManager.checkMemory(runnable);
+ PagingManager pagingManager =
manager.getServer().getPagingManager();
+ if (manager != null && manager.getServer() != null &&
+ manager.getServer().getAddressSettingsRepository() != null &&
+
manager.getServer().getAddressSettingsRepository().getMatch("#").getAddressFullMessagePolicy().equals(AddressFullMessagePolicy.PAGE))
{
+ // If it's paging, we only check for disk full
+ pagingManager.checkStorage(runnable);
+ } else {
+ pagingManager.checkMemory(runnable);
+ }
} else {
final PagingStore store =
manager.getServer().getPagingManager().getPageStore(address);
if (store != null) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
index 0889795..e53b800 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
@@ -66,4 +66,9 @@ public interface ActiveMQAMQPProtocolLogger extends
BasicLogger {
"\nSuccess on Server AMQP Connection {0} on {1} after {2} retries" +
"\n*******************************************************************************************************************************\n",
format = Message.Format.MESSAGE_FORMAT)
void successReconnect(String name, String hostAndPort, int currentRetry);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 111004, value = "AddressFullPolicy clash on an anonymous
producer between destinations {0}(addressFullPolicy={1}) and
{2}(addressFullPolicy={3}). This could lead to semantic inconsistencies on your
clients. Notice you could have other instances of this scenario however this
message will only be logged once. log.debug output would show all instances of
this event.",
+ format = Message.Format.MESSAGE_FORMAT)
+ void incompatibleAddressFullMessagePolicy(String oldAddress, String
oldPolicy, String newAddress, String newPolicy);
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 6be3a18..0457b76 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -34,6 +35,7 @@ import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
+import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -57,6 +59,9 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
private static final Logger log =
Logger.getLogger(ProtonServerReceiverContext.class);
protected SimpleString address;
+ protected SimpleString lastAddress;
+ protected AddressFullMessagePolicy lastAddressPolicy;
+ protected boolean addressAlreadyClashed = false;
protected final Runnable spiFlow = this::sessionSPIFlow;
@@ -174,6 +179,10 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
protected void actualDelivery(AMQPMessage message, Delivery delivery,
Receiver receiver, Transaction tx) {
try {
if (sessionSPI != null) {
+ // message could be null on unit tests (Mocking from
ProtonServerReceiverContextTest).
+ if (address == null && message != null) {
+ validateAddressOnAnonymousLink(message);
+ }
sessionSPI.serverSend(this, tx, receiver, delivery, address,
routingContext, message);
}
} catch (Exception e) {
@@ -184,6 +193,23 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
}
}
+ private void validateAddressOnAnonymousLink(AMQPMessage message) throws
Exception {
+ SimpleString newAddress = message.getAddressSimpleString();
+ if (newAddress != null && !newAddress.equals(lastAddress)) {
+ AddressFullMessagePolicy currentPolicy =
sessionSPI.getProtocolManager().getServer().getPagingManager().getPageStore(newAddress).getAddressFullMessagePolicy();
+ if (lastAddressPolicy != null && lastAddressPolicy != currentPolicy) {
+ if (!addressAlreadyClashed) {
+ addressAlreadyClashed = true; // print the warning only once
+
ActiveMQAMQPProtocolLogger.LOGGER.incompatibleAddressFullMessagePolicy(lastAddress.toString(),
"" + lastAddressPolicy, newAddress.toString(), "" + currentPolicy);
+ }
+
+ log.debug("AddressFullPolicy clash between " + lastAddress + "/" +
lastAddressPolicy + " and " + newAddress + "/" + lastAddressPolicy);
+ }
+ this.lastAddress = message.getAddressSimpleString();
+ this.lastAddressPolicy = currentPolicy;
+ }
+ }
+
public void deliveryFailed(Delivery delivery, Receiver receiver, Exception
e) {
connection.runNow(() -> {
DeliveryState deliveryState = determineDeliveryState(((Source)
receiver.getSource()),
@@ -262,7 +288,7 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
connection.requireInHandler();
// Use the SessionSPI to allocate producer credits, or default, always
allocate credit.
if (sessionSPI != null) {
- sessionSPI.flow(address, creditRunnable);
+ sessionSPI.flow(address != null ? address : lastAddress,
creditRunnable);
} else {
creditRunnable.run();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 3489737..9586fa5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -121,4 +121,13 @@ public interface PagingManager extends ActiveMQComponent,
HierarchicalRepository
*/
void checkMemory(Runnable runWhenAvailable);
+
+ /**
+ * Use this when you have no refernce of an address. (anonymous AMQP
Producers for example)
+ * @param runWhenAvailable
+ */
+ default void checkStorage(Runnable runWhenAvailable) {
+ checkMemory(runWhenAvailable);
+ }
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 4f930b7..3e10291 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -120,6 +120,10 @@ public final class PagingManagerImpl implements
PagingManager {
this.managementAddress = managementAddress;
}
+ public long getMaxSize() {
+ return maxSize;
+ }
+
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository) {
this(pagingSPI, addressSettingsRepository, -1, null);
@@ -255,6 +259,14 @@ public final class PagingManagerImpl implements
PagingManager {
runWhenAvailable.run();
}
+ @Override
+ public void checkStorage(Runnable runWhenAvailable) {
+ if (diskFull) {
+ memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+ return;
+ }
+ runWhenAvailable.run();
+ }
private void memoryReleased() {
Runnable runnable;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AnonymousProducerPageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AnonymousProducerPageTest.java
new file mode 100644
index 0000000..9e232c0
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AnonymousProducerPageTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AnonymousProducerPageTest extends ActiveMQTestBase {
+
+ protected final String protocol;
+
+ @Parameterized.Parameters(name = "protocol={0}")
+ public static Collection getParams() {
+ return Arrays.asList(new Object[][]{
+ {"AMQP"}, {"CORE"}, {"OPENWIRE"}});
+ }
+
+ public AnonymousProducerPageTest(String protocol) {
+ this.protocol = protocol;
+ }
+
+ protected static final String NETTY_ACCEPTOR = "netty-acceptor";
+
+ ActiveMQServer server;
+
+ @Before
+ public void createServer() throws Exception {
+
+ int port = 5672;
+
+ this.server = addServer(this.createServer(true, true));
+
+ server.getConfiguration().getAddressesSettings().clear();
+ server.getConfiguration().addAddressesSetting("#", new
AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
+
+ server.getConfiguration().getAcceptorConfigurations().clear();
+
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server,
port));
+ server.getConfiguration().setName(getName());
+
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory()
+ port);
+
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory()
+ port);
+
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory()
+ port);
+
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory());
+ server.getConfiguration().setJMXManagementEnabled(true);
+ server.getConfiguration().setMessageExpiryScanPeriod(100);
+ server.start();
+ }
+
+ protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer
server, int port) {
+ HashMap<String, Object> params = new HashMap<>();
+ params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME,
getConfiguredProtocols());
+ HashMap<String, Object> amqpParams = new HashMap<>();
+ configureAMQPAcceptorParameters(amqpParams);
+ TransportConfiguration tc = new
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR,
amqpParams);
+ configureAMQPAcceptorParameters(tc);
+ return tc;
+ }
+ protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+ // None by default
+ }
+
+ protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
+ // None by default
+ }
+
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test(timeout = 60_000)
+ public void testNotBlockOnGlobalMaxSizeWithAnonymousProduce() throws
Exception {
+ final int MSG_SIZE = 1000;
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < MSG_SIZE; i++) {
+ builder.append('0');
+ }
+ final String data = builder.toString();
+ final int MSG_COUNT = 3_000;
+
+ // sending size to explode max size
+ server.getPagingManager().addSize((int) ((PagingManagerImpl)
server.getPagingManager()).getMaxSize());
+ server.getPagingManager().addSize(100_000);
+
+ server.getAddressSettingsRepository().addMatch("blockedQueue", new
AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:5672");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ javax.jms.Queue jmsQueue = session.createQueue(getName());
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ TextMessage message = session.createTextMessage(data);
+ producer.send(jmsQueue, message);
+ }
+ session.commit();
+ if (protocol.equals("AMQP")) {
+ // this is only valid for AMQP
+ validatePolicyMismatch(session, producer);
+ }
+ connection.close();
+ }
+
+ private void validatePolicyMismatch(Session session, MessageProducer
producer) throws JMSException {
+ AssertionLoggerHandler.startCapture();
+ try {
+ producer.send(session.createQueue("blockedQueue"),
session.createMessage());
+ session.commit();
+ Assert.assertTrue(AssertionLoggerHandler.findText("AMQ111004"));
+ AssertionLoggerHandler.clear();
+ producer.send(session.createQueue(getName()),
session.createMessage());
+ session.commit();
+ Assert.assertFalse("The warning should be printed only once",
AssertionLoggerHandler.findText("AMQ111004"));
+ } finally {
+ AssertionLoggerHandler.stopCapture();
+ }
+ }
+
+}