This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 08a81a7402 I had a mistake reapplying the large message fix. I'm still 
reworking the commit.
08a81a7402 is described below

commit 08a81a7402a3981d59972b45ef423f2682496f38
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Mar 6 20:05:07 2023 -0500

    I had a mistake reapplying the large message fix. I'm still reworking the 
commit.
    
    I apologize for this spam
    
    This reverts commit 14536bf311e28f4ae825352f8a2fd49a3b9650bf.
---
 .../impl/journal/JournalStorageManager.java        |  17 +-
 tests/soak-tests/pom.xml                           |  25 ---
 .../resources/servers/lminterrupt/management.xml   |  52 ------
 .../activemq/artemis/tests/soak/SoakTestBase.java  |  32 ----
 .../interruptlm/LargeMessageInterruptTest.java     | 199 ---------------------
 5 files changed, 5 insertions(+), 320 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 476f0776f2..aaecba0702 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -447,14 +447,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
    public long storePendingLargeMessage(final long messageID) throws Exception 
{
       try (ArtemisCloseable lock = closeableReadLock()) {
          long recordID = generateID();
-         if (logger.isTraceEnabled()) {
-            logger.trace("Storing pending large message for messageID={} on 
recordID={}", messageID, recordID);
-         }
-         // the pending record has to be stored and synced before the large 
message file is created
-         messageJournal.appendAddRecord(recordID, 
JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new 
PendingLargeMessageEncoding(messageID), true, null);
-         if (logger.isTraceEnabled()) {
-            logger.trace("...Stored pending large message for messageID={} on 
recordID={}", messageID, recordID);
-         }
+         messageJournal.appendAddRecord(recordID, 
JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new 
PendingLargeMessageEncoding(messageID), true, getContext(true));
 
          return recordID;
       }
@@ -576,6 +569,10 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
          }
       }
 
+      // We do this here to avoid a case where the replication gets a list 
without this file
+      // to avoid a race
+      largeMessage.validateFile();
+
       if (largeMessage.toMessage().isDurable()) {
          // We store a marker on the journal that the large file is pending
          long pendingRecordID = storePendingLargeMessage(id);
@@ -583,10 +580,6 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
          largeMessage.setPendingRecordID(pendingRecordID);
       }
 
-      // the file has to be created after te record is stored
-      largeMessage.validateFile();
-
-
       return largeMessage;
    }
 
diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml
index b45ea34b33..4b3e65f42f 100644
--- a/tests/soak-tests/pom.xml
+++ b/tests/soak-tests/pom.xml
@@ -247,31 +247,6 @@
                      </args>
                   </configuration>
                </execution>
-               <!-- Used on LargeMessageInterruptTest -->
-               <execution>
-                  <phase>test-compile</phase>
-                  <id>create-lminterrupt</id>
-                  <goals>
-                     <goal>create</goal>
-                  </goals>
-                  <configuration>
-                     <role>amq</role>
-                     <user>artemis</user>
-                     <password>artemis</password>
-                     <allowAnonymous>true</allowAnonymous>
-                     <noWeb>false</noWeb>
-                     <instance>${basedir}/target/lminterrupt</instance>
-                     
<configuration>${basedir}/target/classes/servers/lminterrupt</configuration>
-                     <args>
-                        <arg>--java-options</arg>
-                        <arg>-Djava.rmi.server.hostname=localhost</arg>
-                        <arg>--queues</arg>
-                        <arg>LargeMessageInterruptTest</arg>
-                        <arg>--name</arg>
-                        <arg>lminterrupt</arg>
-                     </args>
-                  </configuration>
-               </execution>
 
             </executions>
          </plugin>
diff --git 
a/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml 
b/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
deleted file mode 100644
index 1d38e28ac9..0000000000
--- a/tests/soak-tests/src/main/resources/servers/lminterrupt/management.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements. See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License. You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<management-context xmlns="http://activemq.apache.org/schema";>
-   <connector connector-port="1099"/>
-   <authorisation>
-      <allowlist>
-         <entry domain="hawtio"/>
-      </allowlist>
-      <default-access>
-         <access method="list*" roles="amq"/>
-         <access method="get*" roles="amq"/>
-         <access method="is*" roles="amq"/>
-         <access method="set*" roles="amq"/>
-         <access method="*" roles="amq"/>
-      </default-access>
-      <role-access>
-         <match domain="org.apache.activemq.artemis">
-            <access method="list*" roles="amq"/>
-            <access method="get*" roles="amq"/>
-            <access method="is*" roles="amq"/>
-            <access method="set*" roles="amq"/>
-            <!-- Note count and browse are need to access the browse tab in 
the console-->
-            <access method="browse*" roles="amq"/>
-            <access method="count*" roles="amq"/>
-            <access method="*" roles="amq"/>
-         </match>
-         <!--example of how to configure a specific object-->
-         <!--<match domain="org.apache.activemq.artemis" 
key="subcomponent=queues">
-            <access method="list*" roles="view,update,amq"/>
-            <access method="get*" roles="view,update,amq"/>
-            <access method="is*" roles="view,update,amq"/>
-            <access method="set*" roles="update,amq"/>
-            <access method="*" roles="amq"/>
-         </match>-->
-      </role-access>
-   </authorisation>
-</management-context>
\ No newline at end of file
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
index 02dd357eff..40b54ef2cd 100644
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/SoakTestBase.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.tests.soak;
 
 import javax.management.MBeanServerInvocationHandler;
-import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
@@ -29,11 +28,8 @@ import java.net.MalformedURLException;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.cli.commands.Stop;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -156,34 +152,6 @@ public class SoakTestBase extends ActiveMQTestBase {
       throw lastException;
    }
 
-   protected static QueueControl getQueueControl(String uri,
-                                                  ObjectNameBuilder builder,
-                                                  String address,
-                                                  String queueName,
-                                                  RoutingType routingType,
-                                                  long timeout) throws 
Throwable {
-      long expireLoop = System.currentTimeMillis() + timeout;
-      Throwable lastException = null;
-      do {
-         try {
-            JMXConnector connector = newJMXFactory(uri);
-
-            ObjectName objectQueueName = 
builder.getQueueObjectName(SimpleString.toSimpleString(address), 
SimpleString.toSimpleString(queueName), routingType);
-
-            QueueControl queueControl = 
MBeanServerInvocationHandler.newProxyInstance(connector.getMBeanServerConnection(),
 objectQueueName, QueueControl.class, false);
-            queueControl.getMessagesAcknowledged(); // making one call
-            return queueControl;
-         } catch (Throwable e) {
-            logger.warn(e.getMessage(), e);
-            lastException = e;
-            Thread.sleep(500);
-         }
-      }
-      while (expireLoop > System.currentTimeMillis());
-
-      throw lastException;
-   }
-
    protected static JMXConnector getJmxConnector(String hostname, int port) 
throws MalformedURLException {
       // Without this, the RMI server would bind to the default interface IP 
(the user's local IP mostly)
       System.setProperty("java.rmi.server.hostname", hostname);
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
deleted file mode 100644
index e784394933..0000000000
--- 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interruptlm/LargeMessageInterruptTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.tests.soak.interruptlm;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.tests.soak.SoakTestBase;
-import org.apache.activemq.artemis.tests.util.CFUtil;
-import org.apache.activemq.artemis.tests.util.Wait;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-// This is used to kill a server and make sure the server will remove any 
pending files.
-public class LargeMessageInterruptTest extends SoakTestBase {
-
-   public static final String SERVER_NAME_0 = "lminterrupt";
-   private static final String JMX_SERVER_HOSTNAME = "localhost";
-   private static final int JMX_SERVER_PORT_0 = 1099;
-   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-   static String liveURI = "service:jmx:rmi:///jndi/rmi://" + 
JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
-   static ObjectNameBuilder liveNameBuilder = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
"lminterrupt", true);
-   Process serverProcess;
-
-   public ConnectionFactory createConnectionFactory(String protocol) {
-      return CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
-   }
-
-   @Before
-   public void before() throws Exception {
-      cleanupData(SERVER_NAME_0);
-      serverProcess = startServer(SERVER_NAME_0, 0, 30000);
-      disableCheckThread();
-   }
-
-   @Test
-   public void testInterruptLargeMessageAMQPTX() throws Throwable {
-      testInterruptLM("AMQP", true);
-   }
-
-   @Test
-   public void testInterruptLargeMessageCORETX() throws Throwable {
-      testInterruptLM("CORE", true);
-   }
-
-   @Test
-   public void testInterruptLargeMessageAMQPNonTX() throws Throwable {
-      testInterruptLM("AMQP", false);
-   }
-
-   @Test
-   public void testInterruptLargeMessageCORENonTX() throws Throwable {
-      testInterruptLM("CORE", false);
-   }
-
-   private void testInterruptLM(String protocol, boolean tx) throws Throwable {
-      final int BODY_SIZE = 500 * 1024;
-      final int NUMBER_OF_MESSAGES = 10; // this is per producer
-      final int SENDING_THREADS = 10;
-      CyclicBarrier startFlag = new CyclicBarrier(SENDING_THREADS);
-      final CountDownLatch done = new CountDownLatch(SENDING_THREADS);
-      final AtomicInteger produced = new AtomicInteger(0);
-      final ConnectionFactory factory = createConnectionFactory(protocol);
-      final AtomicInteger errors = new AtomicInteger(0); // I don't expect 
many errors since this test is disconnecting and reconnecting the server
-      final CountDownLatch killAt = new CountDownLatch(40);
-
-      ExecutorService executorService = 
Executors.newFixedThreadPool(SENDING_THREADS);
-      runAfter(executorService::shutdownNow);
-
-      String queueName = "LargeMessageInterruptTest";
-
-      String body;
-
-      {
-         StringBuffer buffer = new StringBuffer();
-         while (buffer.length() < BODY_SIZE) {
-            buffer.append("LOREM IPSUM WHATEVER THEY SAY IN THERE I DON'T 
REALLY CARE. I'M NOT SURE IF IT'S LOREM, LAUREM, LAUREN, IPSUM OR YPSUM AND I 
DON'T REALLY CARE ");
-         }
-         body = buffer.toString();
-      }
-
-      for (int i = 0; i < SENDING_THREADS; i++) {
-         executorService.execute(() -> {
-            int numberOfMessages = 0;
-            try {
-               Connection connection = factory.createConnection();
-               Session session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-               MessageProducer producer = 
session.createProducer(session.createQueue(queueName));
-
-               startFlag.await(10, TimeUnit.SECONDS);
-               while (numberOfMessages < NUMBER_OF_MESSAGES) {
-                  try {
-                     producer.send(session.createTextMessage(body));
-                     if (tx) {
-                        session.commit();
-                     }
-                     produced.incrementAndGet();
-                     killAt.countDown();
-                     if (numberOfMessages++ % 10 == 0) {
-                        logger.info("Sent {}", numberOfMessages);
-                     }
-                  } catch (Exception e) {
-                     logger.warn(e.getMessage(), e);
-
-                     logger.warn(e.getMessage(), e);
-                     try {
-                        connection.close();
-                     } catch (Throwable ignored) {
-                     }
-
-                     for (int retryNumber = 0; retryNumber < 100; 
retryNumber++) {
-                        try {
-                           Connection ctest = factory.createConnection();
-                           ctest.close();
-                           break;
-                        } catch (Throwable retry) {
-                           Thread.sleep(100);
-                        }
-                     }
-
-                     connection = factory.createConnection();
-                     session = connection.createSession(tx, tx ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-                     producer = 
session.createProducer(session.createQueue(queueName));
-                     connection.start();
-
-                  }
-               }
-            } catch (Exception e) {
-               logger.warn("Error getting the initial connection", e);
-               errors.incrementAndGet();
-            }
-
-            logger.info("Done sending");
-            done.countDown();
-         });
-      }
-
-      Assert.assertTrue(killAt.await(60, TimeUnit.SECONDS));
-      serverProcess.destroyForcibly();
-      serverProcess = startServer(SERVER_NAME_0, 0, 0);
-      QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, 
queueName, queueName, RoutingType.ANYCAST, 5000);
-
-      Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
-      Assert.assertEquals(0, errors.get());
-
-      long numberOfMessages = queueControl.getMessageCount();
-      logger.info("there are {} messages", numberOfMessages);
-
-      try (Connection connection = factory.createConnection()) {
-         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-         MessageConsumer consumer = 
session.createConsumer(session.createQueue(queueName));
-         connection.start();
-         for (int i = 0; i < numberOfMessages; i++) {
-            Message message = consumer.receive(5000);
-            Assert.assertNotNull(message);
-         }
-      }
-
-      File lmFolder = new File(getServerLocation(SERVER_NAME_0) + 
"/data/large-messages");
-      Assert.assertTrue(lmFolder.exists());
-      Wait.assertEquals(0, () -> lmFolder.listFiles().length);
-
-   }
-
-}
\ No newline at end of file

Reply via email to