This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 08a81a7402 I had a mistake reapplying the large message fix. I'm still
reworking the commit.
08a81a7402 is described below
commit 08a81a7402a3981d59972b45ef423f2682496f38
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Mar 6 20:05:07 2023 -0500
I had a mistake reapplying the large message fix. I'm still reworking the
commit.
I apologize for this spam
This reverts commit 14536bf311e28f4ae825352f8a2fd49a3b9650bf.
---
.../impl/journal/JournalStorageManager.java | 17 +-
tests/soak-tests/pom.xml | 25 ---
.../resources/servers/lminterrupt/management.xml | 52 ------
.../activemq/artemis/tests/soak/SoakTestBase.java | 32 ----
.../interruptlm/LargeMessageInterruptTest.java | 199 ---------------------
5 files changed, 5 insertions(+), 320 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 476f0776f2..aaecba0702 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -447,14 +447,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
public long storePendingLargeMessage(final long messageID) throws Exception
{
try (ArtemisCloseable lock = closeableReadLock()) {
long recordID = generateID();
- if (logger.isTraceEnabled()) {
- logger.trace("Storing pending large message for messageID={} on
recordID={}", messageID, recordID);
- }
- // the pending record has to be stored and synced before the large
message file is created
- messageJournal.appendAddRecord(recordID,
JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new
PendingLargeMessageEncoding(messageID), true, null);
- if (logger.isTraceEnabled()) {
- logger.trace("...Stored pending large message for messageID={} on
recordID={}", messageID, recordID);
- }
+ messageJournal.appendAddRecord(recordID,
JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new
PendingLargeMessageEncoding(messageID), true, getContext(true));
return recordID;
}
@@ -576,6 +569,10 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
}
}
+ // We do this here to avoid a case where the replication gets a list
without this file
+ // to avoid a race
+ largeMessage.validateFile();
+
if (largeMessage.toMessage().isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);
@@ -583,10 +580,6 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
largeMessage.setPendingRecordID(pendingRecordID);
}
- // the file has to be created after te record is stored
- largeMessage.validateFile();
-
-
return largeMessage;
}
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index b45ea34b33..4b3e65f42f 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -247,31 +247,6 @@
</args>
</configuration>
</execution>
- <!-- Used on LargeMessageInterruptTest -->
- <execution>
- <phase>test-compile</phase>
- <id>create-lminterrupt</id>
- <goals>
- <goal>create</goal>
- </goals>
- <configuration>
- <role>amq</role>
- <user>artemis</user>
- <password>artemis</password>
- <allowAnonymous>true</allowAnonymous>
- <noWeb>false</noWeb>
- <instance>${basedir}/target/lminterrupt</instance>
-
<configuration>${basedir}/target/classes/servers/lminterrupt</configuration>
- <args>
- <arg>--java-options</arg>
- <arg>-Djava.rmi.server.hostname=localhost</arg>
- <arg>--queues</arg>
- <arg>LargeMessageInterruptTest</arg>
- <arg>--name</arg>
- <arg>lminterrupt</arg>
- </args>
- </configuration>
- </execution>
</executions>
</plugin>
diff --git
a/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
b/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
deleted file mode 100644
index 1d38e28ac9..0000000000
--- a/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<management-context xmlns="http://activemq.apache.org/schema">
- <connector connector-port="1099"/>
- <authorisation>
- <allowlist>
- <entry domain="hawtio"/>
- </allowlist>
- <default-access>
- <access method="list*" roles="amq"/>
- <access method="get*" roles="amq"/>
- <access method="is*" roles="amq"/>
- <access method="set*" roles="amq"/>
- <access method="*" roles="amq"/>
- </default-access>
- <role-access>
- <match domain="org.apache.activemq.artemis">
- <access method="list*" roles="amq"/>
- <access method="get*" roles="amq"/>
- <access method="is*" roles="amq"/>
- <access method="set*" roles="amq"/>
- <!-- Note count and browse are need to access the browse tab in
the console-->
- <access method="browse*" roles="amq"/>
- <access method="count*" roles="amq"/>
- <access method="*" roles="amq"/>
- </match>
- <!--example of how to configure a specific object-->
- <!--<match domain="org.apache.activemq.artemis"
key="subcomponent=queues">
- <access method="list*" roles="view,update,amq"/>
- <access method="get*" roles="view,update,amq"/>
- <access method="is*" roles="view,update,amq"/>
- <access method="set*" roles="update,amq"/>
- <access method="*" roles="amq"/>
- </match>-->
- </role-access>
- </authorisation>
-</management-context>
\ No newline at end of file
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
index 02dd357eff..40b54ef2cd 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.tests.soak;
import javax.management.MBeanServerInvocationHandler;
-import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
@@ -29,11 +28,8 @@ import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.cli.commands.Stop;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -156,34 +152,6 @@ public class SoakTestBase extends ActiveMQTestBase {
throw lastException;
}
- protected static QueueControl getQueueControl(String uri,
- ObjectNameBuilder builder,
- String address,
- String queueName,
- RoutingType routingType,
- long timeout) throws
Throwable {
- long expireLoop = System.currentTimeMillis() + timeout;
- Throwable lastException = null;
- do {
- try {
- JMXConnector connector = newJMXFactory(uri);
-
- ObjectName objectQueueName =
builder.getQueueObjectName(SimpleString.toSimpleString(address),
SimpleString.toSimpleString(queueName), routingType);
-
- QueueControl queueControl =
MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(),
objectQueueName, QueueControl.class, false);
- queueControl.getMessagesAcknowledged(); // making one call
- return queueControl;
- } catch (Throwable e) {
- logger.warn(e.getMessage(), e);
- lastException = e;
- Thread.sleep(500);
- }
- }
- while (expireLoop > System.currentTimeMillis());
-
- throw lastException;
- }
-
protected static JMXConnector getJmxConnector(String hostname, int port)
throws MalformedURLException {
// Without this, the RMI server would bind to the default interface IP
(the user's local IP mostly)
System.setProperty("java.rmi.server.hostname", hostname);
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
deleted file mode 100644
index e784394933..0000000000
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.soak.interruptlm;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.tests.soak.SoakTestBase;
-import org.apache.activemq.artemis.tests.util.CFUtil;
-import org.apache.activemq.artemis.tests.util.Wait;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// This is used to kill a server and make sure the server will remove any
pending files.
-public class LargeMessageInterruptTest extends SoakTestBase {
-
- public static final String SERVER_NAME_0 = "lminterrupt";
- private static final String JMX_SERVER_HOSTNAME = "localhost";
- private static final int JMX_SERVER_PORT_0 = 1099;
- private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- static String liveURI = "service:jmx:rmi:///jndi/rmi://" +
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
- static ObjectNameBuilder liveNameBuilder =
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(),
"lminterrupt", true);
- Process serverProcess;
-
- public ConnectionFactory createConnectionFactory(String protocol) {
- return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
- }
-
- @Before
- public void before() throws Exception {
- cleanupData(SERVER_NAME_0);
- serverProcess = startServer(SERVER_NAME_0, 0, 30000);
- disableCheckThread();
- }
-
- @Test
- public void testInterruptLargeMessageAMQPTX() throws Throwable {
- testInterruptLM("AMQP", true);
- }
-
- @Test
- public void testInterruptLargeMessageCORETX() throws Throwable {
- testInterruptLM("CORE", true);
- }
-
- @Test
- public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
- testInterruptLM("AMQP", false);
- }
-
- @Test
- public void testInterruptLargeMessageCORENonTX() throws Throwable {
- testInterruptLM("CORE", false);
- }
-
- private void testInterruptLM(String protocol, boolean tx) throws Throwable {
- final int BODY_SIZE = 500 * 1024;
- final int NUMBER_OF_MESSAGES = 10; // this is per producer
- final int SENDING_THREADS = 10;
- CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
- final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
- final AtomicInteger produced = new AtomicInteger(0);
- final ConnectionFactory factory = createConnectionFactory(protocol);
- final AtomicInteger errors = new AtomicInteger(0); // I don't expect
many errors since this test is disconnecting and reconnecting the server
- final CountDownLatch killAt = new CountDownLatch(40);
-
- ExecutorService executorService =
Executors.newFixedThreadPool(SENDING_THREADS);
- runAfter(executorService::shutdownNow);
-
- String queueName = "LargeMessageInterruptTest";
-
- String body;
-
- {
- StringBuffer buffer = new StringBuffer();
- while (buffer.length() < BODY_SIZE) {
- buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I
DON'T REALLY CARE ");
- }
- body = buffer.toString();
- }
-
- for (int i = 0; i < SENDING_THREADS; i++) {
- executorService.execute(() -> {
- int numberOfMessages = 0;
- try {
- Connection connection = factory.createConnection();
- Session session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer =
session.createProducer(session.createQueue(queueName));
-
- startFlag.await(10, TimeUnit.SECONDS);
- while (numberOfMessages < NUMBER_OF_MESSAGES) {
- try {
- producer.send(session.createTextMessage(body));
- if (tx) {
- session.commit();
- }
- produced.incrementAndGet();
- killAt.countDown();
- if (numberOfMessages++ % 10 == 0) {
- logger.info("Sent {}", numberOfMessages);
- }
- } catch (Exception e) {
- logger.warn(e.getMessage(), e);
-
- logger.warn(e.getMessage(), e);
- try {
- connection.close();
- } catch (Throwable ignored) {
- }
-
- for (int retryNumber = 0; retryNumber < 100;
retryNumber++) {
- try {
- Connection ctest = factory.createConnection();
- ctest.close();
- break;
- } catch (Throwable retry) {
- Thread.sleep(100);
- }
- }
-
- connection = factory.createConnection();
- session = connection.createSession(tx, tx ?
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- producer =
session.createProducer(session.createQueue(queueName));
- connection.start();
-
- }
- }
- } catch (Exception e) {
- logger.warn("Error getting the initial connection", e);
- errors.incrementAndGet();
- }
-
- logger.info("Done sending");
- done.countDown();
- });
- }
-
- Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
- serverProcess.destroyForcibly();
- serverProcess = startServer(SERVER_NAME_0, 0, 0);
- QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder,
queueName, queueName, RoutingType.ANYCAST, 5000);
-
- Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
- Assert.assertEquals(0, errors.get());
-
- long numberOfMessages = queueControl.getMessageCount();
- logger.info("there are {} messages", numberOfMessages);
-
- try (Connection connection = factory.createConnection()) {
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
- connection.start();
- for (int i = 0; i < numberOfMessages; i++) {
- Message message = consumer.receive(5000);
- Assert.assertNotNull(message);
- }
- }
-
- File lmFolder = new File(getServerLocation(SERVER_NAME_0) +
"/data/large-messages");
- Assert.assertTrue(lmFolder.exists());
- Wait.assertEquals(0, () -> lmFolder.listFiles().length);
-
- }
-
-}
\ No newline at end of file