This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c0792  ARTEMIS-3065 AMQP Anonymous producer would eventually block
     new 697886a  This closes #3406
78c0792 is described below

commit 78c0792989137a69c31f7310c948a475be2bad7d
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Jan 12 14:52:33 2021 -0500

    ARTEMIS-3065 AMQP Anonymous producer would eventually block
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  16 ++-
 .../amqp/logger/ActiveMQAMQPProtocolLogger.java    |   5 +
 .../amqp/proton/ProtonServerReceiverContext.java   |  28 +++-
 .../artemis/core/paging/PagingManager.java         |   9 ++
 .../core/paging/impl/PagingManagerImpl.java        |  12 ++
 .../paging/AnonymousProducerPageTest.java          | 159 +++++++++++++++++++++
 6 files changed, 226 insertions(+), 3 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 9d165d7..eb62855 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerProducer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -138,6 +139,10 @@ public class AMQPSessionCallback implements 
SessionCallback {
       return coreMessageObjectPools;
    }
 
+   public ProtonProtocolManager getProtocolManager() {
+      return manager;
+   }
+
    @Override
    public boolean isWritable(ReadyListener callback, Object protocolContext) {
       ProtonServerSenderContext senderContext = (ProtonServerSenderContext) 
protocolContext;
@@ -603,10 +608,17 @@ public class AMQPSessionCallback implements 
SessionCallback {
    public void flow(final SimpleString address,
                     Runnable runnable) {
       try {
-         PagingManager pagingManager = manager.getServer().getPagingManager();
 
          if (address == null) {
-            pagingManager.checkMemory(runnable);
+            PagingManager pagingManager = 
manager.getServer().getPagingManager();
+            if (manager != null && manager.getServer() != null &&
+                manager.getServer().getAddressSettingsRepository() != null &&
+                
manager.getServer().getAddressSettingsRepository().getMatch("#").getAddressFullMessagePolicy().equals(AddressFullMessagePolicy.PAGE))
 {
+               // If it's paging, we only check for disk full
+               pagingManager.checkStorage(runnable);
+            } else {
+               pagingManager.checkMemory(runnable);
+            }
          } else {
             final PagingStore store = 
manager.getServer().getPagingManager().getPageStore(address);
             if (store != null) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
index 0889795..e53b800 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
@@ -66,4 +66,9 @@ public interface ActiveMQAMQPProtocolLogger extends 
BasicLogger {
       "\nSuccess on Server AMQP Connection {0} on {1} after {2} retries" +
       
"\n*******************************************************************************************************************************\n",
 format = Message.Format.MESSAGE_FORMAT)
    void successReconnect(String name, String hostAndPort, int currentRetry);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 111004, value = "AddressFullPolicy clash on an anonymous 
producer between destinations {0}(addressFullPolicy={1}) and 
{2}(addressFullPolicy={3}). This could lead to semantic inconsistencies on your 
clients. Notice you could have other instances of this scenario however this 
message will only be logged once. log.debug output would show all instances of 
this event.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void incompatibleAddressFullMessagePolicy(String oldAddress, String 
oldPolicy, String newAddress, String newPolicy);
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 6be3a18..0457b76 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@@ -34,6 +35,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -57,6 +59,9 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
    private static final Logger log = 
Logger.getLogger(ProtonServerReceiverContext.class);
 
    protected SimpleString address;
+   protected SimpleString lastAddress;
+   protected AddressFullMessagePolicy lastAddressPolicy;
+   protected boolean addressAlreadyClashed = false;
 
 
    protected final Runnable spiFlow = this::sessionSPIFlow;
@@ -174,6 +179,10 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
    protected void actualDelivery(AMQPMessage message, Delivery delivery, 
Receiver receiver, Transaction tx) {
       try {
          if (sessionSPI != null) {
+            // message could be null on unit tests (Mocking from 
ProtonServerReceiverContextTest).
+            if (address == null && message != null) {
+               validateAddressOnAnonymousLink(message);
+            }
             sessionSPI.serverSend(this, tx, receiver, delivery, address, 
routingContext, message);
          }
       } catch (Exception e) {
@@ -184,6 +193,23 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       }
    }
 
+   private void validateAddressOnAnonymousLink(AMQPMessage message) throws 
Exception {
+      SimpleString newAddress = message.getAddressSimpleString();
+      if (newAddress != null && !newAddress.equals(lastAddress)) {
+         AddressFullMessagePolicy currentPolicy = 
sessionSPI.getProtocolManager().getServer().getPagingManager().getPageStore(newAddress).getAddressFullMessagePolicy();
+         if (lastAddressPolicy != null && lastAddressPolicy != currentPolicy) {
+            if (!addressAlreadyClashed) {
+               addressAlreadyClashed = true; // print the warning only once
+               
ActiveMQAMQPProtocolLogger.LOGGER.incompatibleAddressFullMessagePolicy(lastAddress.toString(),
 "" + lastAddressPolicy, newAddress.toString(), "" + currentPolicy);
+            }
+
+            log.debug("AddressFullPolicy clash between " + lastAddress + "/" + 
lastAddressPolicy + " and " + newAddress + "/" + lastAddressPolicy);
+         }
+         this.lastAddress = message.getAddressSimpleString();
+         this.lastAddressPolicy = currentPolicy;
+      }
+   }
+
    public void deliveryFailed(Delivery delivery, Receiver receiver, Exception 
e) {
       connection.runNow(() -> {
          DeliveryState deliveryState = determineDeliveryState(((Source) 
receiver.getSource()),
@@ -262,7 +288,7 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       connection.requireInHandler();
       // Use the SessionSPI to allocate producer credits, or default, always 
allocate credit.
       if (sessionSPI != null) {
-         sessionSPI.flow(address, creditRunnable);
+         sessionSPI.flow(address != null ? address : lastAddress, 
creditRunnable);
       } else {
          creditRunnable.run();
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
index 3489737..9586fa5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java
@@ -121,4 +121,13 @@ public interface PagingManager extends ActiveMQComponent, 
HierarchicalRepository
     */
    void checkMemory(Runnable runWhenAvailable);
 
+
+   /**
+    * Use this when you have no refernce of an address. (anonymous AMQP 
Producers for example)
+    * @param runWhenAvailable
+    */
+   default void checkStorage(Runnable runWhenAvailable) {
+      checkMemory(runWhenAvailable);
+   }
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 4f930b7..3e10291 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -120,6 +120,10 @@ public final class PagingManagerImpl implements 
PagingManager {
       this.managementAddress = managementAddress;
    }
 
+   public long getMaxSize() {
+      return maxSize;
+   }
+
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
                             final HierarchicalRepository<AddressSettings> 
addressSettingsRepository) {
       this(pagingSPI, addressSettingsRepository, -1, null);
@@ -255,6 +259,14 @@ public final class PagingManagerImpl implements 
PagingManager {
       runWhenAvailable.run();
    }
 
+   @Override
+   public void checkStorage(Runnable runWhenAvailable) {
+      if (diskFull) {
+         memoryCallback.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+         return;
+      }
+      runWhenAvailable.run();
+   }
 
    private void memoryReleased() {
       Runnable runnable;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AnonymousProducerPageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AnonymousProducerPageTest.java
new file mode 100644
index 0000000..9e232c0
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/AnonymousProducerPageTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AnonymousProducerPageTest extends ActiveMQTestBase {
+
+   protected final String protocol;
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection getParams() {
+      return Arrays.asList(new Object[][]{
+         {"AMQP"}, {"CORE"}, {"OPENWIRE"}});
+   }
+
+   public AnonymousProducerPageTest(String protocol) {
+      this.protocol = protocol;
+   }
+
+   protected static final String NETTY_ACCEPTOR = "netty-acceptor";
+
+   ActiveMQServer server;
+
+   @Before
+   public void createServer() throws Exception {
+
+      int port = 5672;
+
+      this.server = addServer(this.createServer(true, true));
+
+      server.getConfiguration().getAddressesSettings().clear();
+      server.getConfiguration().addAddressesSetting("#", new 
AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
+
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server,
 port));
+      server.getConfiguration().setName(getName());
+      
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory()
 + port);
+      
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory()
 + port);
+      
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory()
 + port);
+      
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory());
+      server.getConfiguration().setJMXManagementEnabled(true);
+      server.getConfiguration().setMessageExpiryScanPeriod(100);
+      server.start();
+   }
+
+   protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer 
server, int port) {
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, 
getConfiguredProtocols());
+      HashMap<String, Object> amqpParams = new HashMap<>();
+      configureAMQPAcceptorParameters(amqpParams);
+      TransportConfiguration tc = new 
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, 
amqpParams);
+      configureAMQPAcceptorParameters(tc);
+      return tc;
+   }
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      // None by default
+   }
+
+   protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
+      // None by default
+   }
+
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test(timeout = 60_000)
+   public void testNotBlockOnGlobalMaxSizeWithAnonymousProduce() throws 
Exception {
+      final int MSG_SIZE = 1000;
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0; i < MSG_SIZE; i++) {
+         builder.append('0');
+      }
+      final String data = builder.toString();
+      final int MSG_COUNT = 3_000;
+
+      // sending size to explode max size
+      server.getPagingManager().addSize((int) ((PagingManagerImpl) 
server.getPagingManager()).getMaxSize());
+      server.getPagingManager().addSize(100_000);
+
+      server.getAddressSettingsRepository().addMatch("blockedQueue", new 
AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+      MessageProducer producer = session.createProducer(null);
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+      javax.jms.Queue jmsQueue = session.createQueue(getName());
+
+      for (int i = 0; i < MSG_COUNT; i++) {
+         TextMessage message = session.createTextMessage(data);
+         producer.send(jmsQueue, message);
+      }
+      session.commit();
+      if (protocol.equals("AMQP")) {
+         // this is only valid for AMQP
+         validatePolicyMismatch(session, producer);
+      }
+      connection.close();
+   }
+
+   private void validatePolicyMismatch(Session session, MessageProducer 
producer) throws JMSException {
+      AssertionLoggerHandler.startCapture();
+      try {
+         producer.send(session.createQueue("blockedQueue"), 
session.createMessage());
+         session.commit();
+         Assert.assertTrue(AssertionLoggerHandler.findText("AMQ111004"));
+         AssertionLoggerHandler.clear();
+         producer.send(session.createQueue(getName()), 
session.createMessage());
+         session.commit();
+         Assert.assertFalse("The warning should be printed only once", 
AssertionLoggerHandler.findText("AMQ111004"));
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
+
+}

Reply via email to