[
https://issues.apache.org/jira/browse/ARTEMIS-4206?focusedWorklogId=855099&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-855099
]
ASF GitHub Bot logged work on ARTEMIS-4206:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Apr/23 16:08
Start Date: 05/Apr/23 16:08
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on code in PR #4418:
URL: https://github.com/apache/activemq-artemis/pull/4418#discussion_r1158730852
##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageFrozenTest.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.interruptlm;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.Locale;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.TcpProxy;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LargeMessageFrozenTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ TcpProxy proxy;
+
+ ActiveMQServer server;
+
+ @Before
+ public void startServer() throws Exception {
+ server = createServer(true, true);
+ server.getConfiguration().addAcceptorConfiguration("alternate",
"tcp://localhost:44444?amqpIdleTimeout=100");
+ server.start();
+
+ startProxy();
+ }
+
+ private void startProxy() {
+ proxy = new TcpProxy("localhost", 44444, 33333, false);
+ proxy.startProxy();
+ }
+
+ @After
+ public void stopProxy() throws Exception {
+ proxy.stopProxy(5000);
+ }
+
+ @Test
+ public void testCore() throws Exception {
+ testFreeze("CORE");
+ }
+
+ @Test
+ public void testAMQP() throws Exception {
+ testFreeze("AMQP");
+ }
+
+ public void testFreeze(String protocol) throws Exception {
+
+ ConnectionFactory factory;
+ switch (protocol.toUpperCase(Locale.ROOT)) {
+ case "CORE":
+ ActiveMQConnectionFactory artemisfactory = new
ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
+ Assert.assertEquals(100,
artemisfactory.getServerLocator().getClientFailureCheckPeriod());
+ Assert.assertEquals(1000,
artemisfactory.getServerLocator().getConnectionTTL());
+ Assert.assertEquals(1000,
artemisfactory.getServerLocator().getConsumerWindowSize());
+ factory = artemisfactory;
+ break;
+ case "AMQP":
+ JmsConnectionFactory qpidFactory = new
JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=300&jms.prefetchPolicy.all=2");
+ factory = qpidFactory;
+ break;
+ default:
+ factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:33333");
+ }
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.createQueue(new
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getName());
+
+ Assert.assertEquals(1, proxy.getInboundHandlers().size());
+ Assert.assertEquals(1, proxy.getOutbounddHandlers().size());
+
+ String body;
+ {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < 10 * 1024 * 1024) {
+ buffer.append("Not so big, but big!!");
+ }
+ body = buffer.toString();
+ }
+
+ int NUMBER_OF_MESSAGES = 10;
+
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage(body));
+ }
+ session.commit();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ boolean failed = false;
+
+ for (int repeat = 0; repeat < 5; repeat++) {
+ try {
+ for (int i = 0; i < 1; i++) {
+ Assert.assertNotNull(consumer.receive(1000));
+ }
+ proxy.stopAllHandlers();
+ consumer.receive(100);
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // just
to force an exception
+ } catch (Exception expected) {
+ logger.info(expected.getMessage(), expected);
+ failed = true;
+ }
+
+ Assert.assertTrue(failed);
+ connection = factory.createConnection();
+ connection.start();
+ runAfter(connection::close);
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ consumer = session.createConsumer(queue);
+ }
+
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(body, message.getText());
+ session.commit();
+ }
+
+ Wait.assertEquals(0, () -> {
+ System.gc();
+ return
server.getConfiguration().getLargeMessagesLocation().listFiles().length;
+ });
+ }
+
+ @Test
+ public void testAutoAckAMQP() throws Exception {
+ testFreezeAutoAck("AMQP");
+ }
+
+ public void testFreezeAutoAck(String protocol) throws Exception {
+
+ ConnectionFactory factory;
+ switch (protocol.toUpperCase(Locale.ROOT)) {
+ case "CORE":
+ ActiveMQConnectionFactory artemisfactory = new
ActiveMQConnectionFactory("tcp://localhost:33333?connectionTTL=1000&clientFailureCheckPeriod=100&consumerWindowSize=1000");
+ Assert.assertEquals(100,
artemisfactory.getServerLocator().getClientFailureCheckPeriod());
+ Assert.assertEquals(1000,
artemisfactory.getServerLocator().getConnectionTTL());
+ Assert.assertEquals(1000,
artemisfactory.getServerLocator().getConsumerWindowSize());
+ factory = artemisfactory;
+ break;
+ case "AMQP":
+ JmsConnectionFactory qpidFactory = new
JmsConnectionFactory("amqp://localhost:33333?amqp.idleTimeout=300&jms.prefetchPolicy.all=2");
+ factory = qpidFactory;
+ break;
+ default:
+ factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:33333");
+ }
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.createQueue(new
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+ Connection connection = factory.createConnection();
+ runAfter(connection::close);
+ Session sessionConsumer = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = sessionConsumer.createQueue(getName());
+
+ Assert.assertEquals(1, proxy.getInboundHandlers().size());
+ Assert.assertEquals(1, proxy.getOutbounddHandlers().size());
+
+ String body;
+ {
+ StringBuffer buffer = new StringBuffer();
+ while (buffer.length() < 10 * 1024 * 1024) {
+ buffer.append("Not so big, but big!!");
+ }
+ body = buffer.toString();
+ }
+
+ int NUMBER_OF_MESSAGES = 40;
+
+ try (Session sessionProducer = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE)) {
+ MessageProducer producer = sessionProducer.createProducer(queue);
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(sessionConsumer.createTextMessage(body));
+ }
+ sessionProducer.commit();
+ }
+
+ MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+ connection.start();
+
+ boolean failed = false;
+
+ try {
+ for (int i = 0; i < 10; i++) {
+ consumer.receive(5000);
+ }
+ proxy.stopAllHandlers();
+ consumer.receive(100);
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // just to
force an exception
+ } catch (Exception expected) {
+ logger.info(expected.getMessage(), expected);
+ failed = true;
+ }
+
+ Wait.assertEquals(0, () ->
server.getActiveMQServerControl().getConnectionCount());
+
+ Thread.sleep(500);
Review Comment:
This sleep is not needed, I'm removing it.
Issue Time Tracking
-------------------
Worklog Id: (was: 855099)
Time Spent: 4h 50m (was: 4h 40m)
> Unreferenced AMQP Large Messages are not removed
> ------------------------------------------------
>
> Key: ARTEMIS-4206
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4206
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.29.0
>
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Say you crashed the server after the ack, and before the file.remove, and the
> journal. record removal.
> The AMQP Large Message may not be removed right away, requiring a restart of
> the broker.
> At this point this is really caused by ARTEMIS-4193 and only affected 2.29.0
> and no previous versions
--
This message was sent by Atlassian Jira
(v8.20.10#820010)