[ 
https://issues.apache.org/jira/browse/ARTEMIS-4206?focusedWorklogId=855100&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-855100
 ]

ASF GitHub Bot logged work on ARTEMIS-4206:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Apr/23 16:14
            Start Date: 05/Apr/23 16:14
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4418:
URL: https://github.com/apache/activemq-artemis/pull/4418#discussion_r1158738044


##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageFrozenTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.Locale;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.TcpProxy;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LargeMessageFrozenTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   TcpProxy proxy;
+
+   ActiveMQServer server;
+
+   @Before
+   public void startServer() throws Exception {
+      server = createServer(true, true);
+      server.getConfiguration().addAcceptorConfiguration("alternate", 
"tcp://localhost:44444?amqpIdleTimeout=100");
+      server.start();
+
+      startProxy();
+   }
+
+   private void startProxy() {
+      proxy = new TcpProxy("localhost", 44444, 33333, false);
+      proxy.startProxy();
+   }
+
+   @After
+   public void stopProxy() throws Exception {
+      proxy.stopProxy(5000);
+   }
+
+   @Test
+   public void testCore() throws Exception {
+      testFreeze("CORE");
+   }
+
+   @Test
+   public void testAMQP() throws Exception {
+      testFreeze("AMQP");
+   }
+
+   public void testFreeze(String protocol) throws Exception {
+
+      ConnectionFactory factory;
+      switch (protocol.toUpperCase(Locale.ROOT)) {
+         case "CORE":
+            ActiveMQConnectionFactory artemisfactory = new 
ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
+            Assert.assertEquals(100, 
artemisfactory.getServerLocator().getClientFailureCheckPeriod());
+            Assert.assertEquals(1000, 
artemisfactory.getServerLocator().getConnectionTTL());
+            Assert.assertEquals(1000, 
artemisfactory.getServerLocator().getConsumerWindowSize());
+            factory = artemisfactory;
+            break;
+         case "AMQP":
+            JmsConnectionFactory qpidFactory = new 
JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=300&jms.prefetchPolicy.all=2");
+            factory = qpidFactory;
+            break;
+         default:
+            factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:33333");
+      }
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.createQueue(new 
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+      Connection connection = factory.createConnection();
+      runAfter(connection::close);
+      Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+      Queue queue = session.createQueue(getName());
+
+      Assert.assertEquals(1, proxy.getInboundHandlers().size());
+      Assert.assertEquals(1, proxy.getOutbounddHandlers().size());
+
+      String body;
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < 10 * 1024 * 1024) {
+            buffer.append("Not so big, but big!!");
+         }
+         body = buffer.toString();
+      }
+
+      int NUMBER_OF_MESSAGES = 10;
+
+      MessageProducer producer = session.createProducer(queue);
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         producer.send(session.createTextMessage(body));
+      }
+      session.commit();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      connection.start();
+
+      boolean failed = false;
+
+      for (int repeat = 0; repeat < 5; repeat++) {
+         try {
+            for (int i = 0; i < 1; i++) {
+               Assert.assertNotNull(consumer.receive(1000));
+            }
+            proxy.stopAllHandlers();
+            consumer.receive(100);
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // just 
to force an exception
+         } catch (Exception expected) {
+            logger.info(expected.getMessage(), expected);
+            failed = true;
+         }
+
+         Assert.assertTrue(failed);
+         connection = factory.createConnection();
+         connection.start();
+         runAfter(connection::close);
+         session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         consumer = session.createConsumer(queue);
+      }
+
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(body, message.getText());
+         session.commit();
+      }
+
+      Wait.assertEquals(0, () -> {
+         System.gc();
+         return 
server.getConfiguration().getLargeMessagesLocation().listFiles().length;
+      });
+   }
+
+   @Test
+   public void testAutoAckAMQP() throws Exception {
+      testFreezeAutoAck("AMQP");
+   }
+
+   public void testFreezeAutoAck(String protocol) throws Exception {
+
+      ConnectionFactory factory;
+      switch (protocol.toUpperCase(Locale.ROOT)) {
+         case "CORE":
+            ActiveMQConnectionFactory artemisfactory = new 
ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
+            Assert.assertEquals(100, 
artemisfactory.getServerLocator().getClientFailureCheckPeriod());
+            Assert.assertEquals(1000, 
artemisfactory.getServerLocator().getConnectionTTL());
+            Assert.assertEquals(1000, 
artemisfactory.getServerLocator().getConsumerWindowSize());
+            factory = artemisfactory;
+            break;
+         case "AMQP":
+            JmsConnectionFactory qpidFactory = new 
JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=300&jms.prefetchPolicy.all=2");
+            factory = qpidFactory;
+            break;
+         default:
+            factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:33333");
+      }
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.createQueue(new 
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+      Connection connection = factory.createConnection();
+      runAfter(connection::close);
+      Session sessionConsumer = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = sessionConsumer.createQueue(getName());
+
+      Assert.assertEquals(1, proxy.getInboundHandlers().size());
+      Assert.assertEquals(1, proxy.getOutbounddHandlers().size());
+
+      String body;
+      {
+         StringBuffer buffer = new StringBuffer();
+         while (buffer.length() < 10 * 1024 * 1024) {
+            buffer.append("Not so big, but big!!");
+         }
+         body = buffer.toString();
+      }
+
+      int NUMBER_OF_MESSAGES = 40;
+
+      try (Session sessionProducer = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE)) {
+         MessageProducer producer = sessionProducer.createProducer(queue);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(sessionConsumer.createTextMessage(body));
+         }
+         sessionProducer.commit();
+      }
+
+      MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+      connection.start();
+
+      boolean failed = false;
+
+      try {
+         for (int i = 0; i < 10; i++) {
+            consumer.receive(5000);
+         }
+         proxy.stopAllHandlers();
+         consumer.receive(100);
+         connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // just to 
force an exception
+      } catch (Exception expected) {
+         logger.info(expected.getMessage(), expected);
+         failed = true;
+      }
+
+      Wait.assertEquals(0, () -> 
server.getActiveMQServerControl().getConnectionCount());
+
+      Thread.sleep(500);

Review Comment:
   ... removed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 855100)
    Time Spent: 5h  (was: 4h 50m)

> Unreferenced AMQP Large Messages are not removed
> ------------------------------------------------
>
>                 Key: ARTEMIS-4206
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4206
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.29.0
>
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> Say you crashed the server after the ack, and before the file.remove, and the 
> journal. record removal.
> The AMQP Large Message may not be removed right away, requiring a restart of 
> the broker.
> At this point this is really caused by ARTEMIS-4193  and only affected 2.29.0 
> and no previous versions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to