This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new eb11b044af ARTEMIS-4108 AMQP Drain fails under load with Large Messages
eb11b044af is described below

commit eb11b044af135856d0cc9870dd8a34fb90e6ba21
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Dec 5 13:00:50 2022 -0500

    ARTEMIS-4108 AMQP Drain fails under load with Large Messages
---
 .../amqp/proton/ProtonServerSenderContext.java     |  22 ++-
 .../tests/soak/client/LargeMessageSoakTest.java    | 165 +++++++++++++++++++++
 2 files changed, 185 insertions(+), 2 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index f73c7b41f8..e8654b06e4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -129,6 +129,7 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
    // as large message could be interrupted due to flow control and resumed at 
the same message
    volatile boolean hasLarge = false;
    volatile LargeMessageDeliveryContext pendingLargeMessage = null;
+   volatile Runnable afterLargeMessage;
 
 
    private int credits = 0;
@@ -177,6 +178,10 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
    @Override
    public void onFlow(int currentCredits, boolean drain) {
+
+      if (logger.isDebugEnabled()) {
+         logger.debug("flow {}, draing={}", currentCredits, drain);
+      }
       connection.requireInHandler();
 
       setupCredit();
@@ -191,8 +196,11 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
                public void run() {
                   try {
                      connection.runNow(() -> {
-                        plugSender.reportDrained();
-                        setupCredit();
+                        if (pendingLargeMessage != null) {
+                           afterLargeMessage = () -> drained(plugSender);
+                        } else {
+                           drained(plugSender);
+                        }
                      });
                   } finally {
                      draining.set(false);
@@ -205,6 +213,11 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       }
    }
 
+   private void drained(ProtonServerSenderContext sender) {
+      sender.reportDrained();
+      setupCredit();
+   }
+
    public boolean hasCredits() {
       if (hasLarge) {
          // we will resume accepting once the large message is finished
@@ -786,6 +799,11 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
    private void finishLargeMessage() {
       lmUsageDown();
+      Runnable localRunnable = afterLargeMessage;
+      afterLargeMessage = null;
+      if (localRunnable != null) {
+         localRunnable.run();
+      }
       pendingLargeMessage = null;
       hasLarge = false;
       brokerConsumer.promptDelivery();
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java
new file mode 100644
index 0000000000..7e625c1fa6
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/LargeMessageSoakTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LargeMessageSoakTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      this.server = this.createServer(true, true);
+      server.start();
+   }
+
+   @Test
+   public void testAMQP() throws Exception {
+      testSendReceive("AMQP");
+   }
+
+   @Test
+   public void testCORE() throws Exception {
+      testSendReceive("CORE");
+   }
+
+   @Test
+   public void testOpenWire() throws Exception {
+      testSendReceive("OPENWIRE");
+   }
+
+   public void testSendReceive(String protocol) throws Exception {
+      AtomicInteger errors = new AtomicInteger(0);
+
+      final int THREADS = 5;
+      final int MESSAGE_COUNT = 5;
+      final int MESSAGE_SIZE = 10000000;
+
+      ExecutorService executorService = Executors.newFixedThreadPool(THREADS * 
2);
+      runAfter(executorService::shutdownNow);
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      final Connection connectionConsumer = factory.createConnection();
+      connectionConsumer.start();
+      final Connection connectionProducer = factory.createConnection();
+
+      runAfter(connectionProducer::close);
+      runAfter(connectionConsumer::close);
+
+      final String largetext;
+
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < MESSAGE_SIZE) {
+            buffer.append("Lorem Ypsum blablabla blabalbala I don't care 
whatever it is in that thing...");
+         }
+         largetext = buffer.toString();
+      }
+
+      CountDownLatch done = new CountDownLatch(THREADS * 2);
+
+
+      for (int t = 0; t < THREADS; t++) {
+         final int localT = t;
+         executorService.execute(() -> {
+            try {
+               try (Session session = connectionConsumer.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+                  MessageConsumer consumer = 
session.createConsumer(session.createQueue("TEST"));
+                  for (int i = 0; i < MESSAGE_COUNT && errors.get() == 0; i++) 
{
+                     TextMessage textMessage;
+                     do {
+                        textMessage = (TextMessage) consumer.receive(300);
+                        if (textMessage == null) {
+                           if (logger.isTraceEnabled()) {
+                              logger.trace("Retrying on thread consumer {}", 
localT);
+                           }
+                        }
+                     }
+                     while (textMessage == null);
+
+
+                     Assert.assertNotNull(textMessage);
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Consumer Thread {} received {} messages, 
protocol={}", localT, i, protocol);
+                     }
+                     // Since all messages come from the same queue on all 
consumers, this is the only assertion possible for the message
+                     Assert.assertEquals(largetext, textMessage.getText());
+                  }
+               }
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      for (int t = 0; t < THREADS; t++) {
+         final int localT = t;
+         executorService.execute(() -> {
+            try {
+               try (Session session = connectionProducer.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+                  MessageProducer producer = 
session.createProducer(session.createQueue("TEST"));
+                  for (int i = 0; i < MESSAGE_COUNT && errors.get() == 0; i++) 
{
+                     TextMessage textMessage = 
session.createTextMessage(largetext);
+                     producer.send(textMessage);
+                     if (logger.isDebugEnabled() && i % 10 == 0) {
+                        logger.debug("Producing thread {} sent {} messages, 
protocol={}", localT, i, protocol);
+                     }
+                  }
+               }
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(done.await(5, TimeUnit.MINUTES));
+      Assert.assertEquals(0, errors.get());
+   }
+
+
+}

Reply via email to