This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e4bc33dc4 ARTEMIS-5097 Allow export and import data of undefined 
queues
8e4bc33dc4 is described below

commit 8e4bc33dc4d7b4d2031c1bda996c7ce002a3d0e8
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Oct 11 11:04:41 2024 -0400

    ARTEMIS-5097 Allow export and import data of undefined queues
    
    This will allow users eventually undoing mistakes after removing a queue by 
accident.
---
 .../cli/commands/tools/xml/XmlDataExporter.java    |  38 ++++++-
 .../cli/commands/tools/xml/XmlDataImporter.java    |  17 ++-
 .../api/core/management/ManagementHelper.java      |   3 +
 .../codec/PersistentQueueBindingEncoding.java      |   4 +
 .../persistence/XmlImportExportTest.java           | 119 +++++++++++++++++++++
 5 files changed, 178 insertions(+), 3 deletions(-)

diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
index 174fbfff12..80bdffe757 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
@@ -80,6 +80,11 @@ public final class XmlDataExporter extends DBOption {
 
    private XMLStreamWriter xmlWriter;
 
+   private Throwable lastError;
+
+   @Option(names = "undefined-prefix", description = "In case a queue does not 
exist, this will define the prefix to be used on the message export. Default: 
'UndefinedQueue_'")
+   private String undefinedPrefix = "UndefinedQueue_";
+
    // an inner map of message refs hashed by the queue ID to which they belong 
and then hashed by their record ID
    private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new 
HashMap<>();
 
@@ -100,6 +105,15 @@ public final class XmlDataExporter extends DBOption {
 
    XMLMessageExporter exporter;
 
+   public String getUndefinedPrefix() {
+      return undefinedPrefix;
+   }
+
+   public XmlDataExporter setUndefinedPrefix(String undefinedPrefix) {
+      this.undefinedPrefix = undefinedPrefix;
+      return this;
+   }
+
    @Override
    public Object execute(ActionContext context) throws Exception {
       super.execute(context);
@@ -331,11 +345,16 @@ public final class XmlDataExporter extends DBOption {
          xmlWriter.writeEndDocument();
          xmlWriter.flush();
          xmlWriter.close();
-      } catch (Exception e) {
+      } catch (Throwable e) {
          e.printStackTrace();
+         this.lastError = e;
       }
    }
 
+   public Throwable getLastError() {
+      return lastError;
+   }
+
    private void printBindingsAsXML() throws XMLStreamException {
       xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
       for (Map.Entry<Long, PersistentAddressBindingEncoding> 
addressBindingEncodingEntry : addressBindings.entrySet()) {
@@ -475,7 +494,22 @@ public final class XmlDataExporter extends DBOption {
    private List<String> extractQueueNames(HashMap<Long, 
DescribeJournal.ReferenceDescribe> refMap) {
       List<String> queues = new ArrayList<>();
       for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
-         
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
+         String queueName;
+
+         PersistentQueueBindingEncoding persistentQueueBindingEncoding = 
queueBindings.get(ref.refEncoding.queueID);
+         if (persistentQueueBindingEncoding == null) {
+            PersistentQueueBindingEncoding undefinedQueue = new 
PersistentQueueBindingEncoding();
+            undefinedQueue.setId(ref.refEncoding.queueID);
+            undefinedQueue.replaceQueueName(SimpleString.of(undefinedPrefix + 
ref.refEncoding.queueID));
+            undefinedQueue.replaceAddress(undefinedQueue.getQueueName());
+            queueBindings.put(undefinedQueue.getId(), undefinedQueue);
+            queueName = String.valueOf(undefinedQueue.getQueueName());
+            getActionContext().err.println("Queue ID " + 
ref.refEncoding.queueID + " not defined. Exporting it as " + 
undefinedQueue.getQueueName());
+         } else {
+            queueName = 
String.valueOf(persistentQueueBindingEncoding.getQueueName());
+         }
+
+         queues.add(queueName);
       }
       return queues;
    }
diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
index e76f6989b8..eac41a2696 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
@@ -288,8 +288,23 @@ public final class XmlDataImporter extends 
ConnectionConfigurationAbtract {
       }
    }
 
+
+   private void createUndefinedQueue(String name, RoutingType routingType) 
throws Exception {
+      ClientSession.QueueQuery queueQuery = 
managementSession.queueQuery(SimpleString.of(name));
+      if (!queueQuery.isExists()) {
+         
managementSession.createQueue(QueueConfiguration.of(name).setRoutingType(routingType).setDurable(true).setAutoCreateAddress(true));
+      }
+   }
+
+
    private void sendMessage(List<String> queues, Message message) throws 
Exception {
-      final String destination = addressMap.get(queues.get(0));
+      String destination = addressMap.get(queues.get(0));
+      if (destination == null) {
+         createUndefinedQueue(queues.get(0), message.getRoutingType());
+         destination = queues.get(0);
+         addressMap.put(queues.get(0), queues.get(0));
+      }
+
       final ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
 
       final boolean debugLog = logger.isDebugEnabled();
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 88090c2ef9..8892a630f0 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -289,6 +289,9 @@ public final class ManagementHelper {
     * Returns whether the invocation of the management operation on the server 
resource succeeded.
     */
    public static boolean hasOperationSucceeded(final Message message) {
+      if (message == null) {
+         return false;
+      }
       if (!ManagementHelper.isOperationResult(message)) {
          return false;
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 71464a5141..5d5551b442 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -186,6 +186,10 @@ public class PersistentQueueBindingEncoding implements 
EncodingSupport, QueueBin
       this.name = newName;
    }
 
+   public void replaceAddress(SimpleString address) {
+      this.address = address;
+   }
+
    @Override
    public SimpleString getFilterString() {
       return filterString;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index 9462b08c49..fac8bbc311 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -27,11 +27,15 @@ import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.util.EnumSet;
 import java.util.UUID;
 
@@ -48,6 +52,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter;
 import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator;
@@ -55,11 +60,13 @@ import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.tests.unit.util.InVMContext;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -1269,4 +1276,116 @@ public class XmlImportExportTest extends 
ActiveMQTestBase {
       locator.close();
       server.stop();
    }
+
+
+   @Test
+   public void testRemovedQueue() throws Exception {
+
+      String undefinedPrefix = "undef_" + RandomUtil.randomString() + "_";
+      final int numberOfMessages = 100;
+
+      server = createServer(true, true);
+      server.start();
+      forceLong();
+
+      String anycastQueueName = getTestClassName() + RandomUtil.randomString();
+      String multicastQueueName = getTestClassName() + 
RandomUtil.randomString();
+      createAnycastPair(server, anycastQueueName);
+      server.addAddressInfo(new 
AddressInfo(multicastQueueName).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false));
+      
server.createQueue(QueueConfiguration.of(multicastQueueName).setRoutingType(RoutingType.MULTICAST).setAddress(multicastQueueName));
+
+      org.apache.activemq.artemis.core.server.Queue anycastServerQueue = 
server.locateQueue(anycastQueueName);
+      assertNotNull(anycastServerQueue);
+      assertEquals(RoutingType.ANYCAST, anycastServerQueue.getRoutingType());
+      org.apache.activemq.artemis.core.server.Queue multiCastServerQueue = 
server.locateQueue(multicastQueueName);
+      assertNotNull(multiCastServerQueue);
+      assertEquals(RoutingType.MULTICAST, 
multiCastServerQueue.getRoutingType());
+
+      {
+         ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Topic topic = session.createTopic(multicastQueueName);
+            Queue queue = session.createQueue(anycastQueueName);
+
+            try (MessageProducer producer = session.createProducer(queue)) {
+               for (int i = 0; i < numberOfMessages; i++) {
+                  producer.send(session.createTextMessage("hello " + i));
+               }
+            }
+
+            try (MessageProducer producer = session.createProducer(topic)) {
+               for (int i = 0; i < numberOfMessages; i++) {
+                  producer.send(session.createTextMessage("hello " + i));
+               }
+            }
+
+            session.commit();
+         }
+      }
+
+      // this is forcing a situation where the queue was removed and the 
messages are still in the journal
+      removeAddressAndQueue(anycastServerQueue);
+      removeAddressAndQueue(multiCastServerQueue);
+
+      server.stop();
+
+      final String fileName = "test.out";
+
+      FileOutputStream fileOutputStream = new FileOutputStream(new 
File(getTestDir(), fileName));
+      BufferedOutputStream bufferOut = new 
BufferedOutputStream(fileOutputStream);
+      XmlDataExporter xmlDataExporter = new XmlDataExporter();
+
+      xmlDataExporter.setUndefinedPrefix(undefinedPrefix);
+
+      // the journal should still export even though the bindings don't exist 
any more
+      // this is to "facilitate" users recovering or undoing mistakes
+      xmlDataExporter.process(bufferOut, getBindingsDir(), getJournalDir(), 
getPageDir(), getLargeMessagesDir());
+      bufferOut.close();
+      assertNull(xmlDataExporter.getLastError());
+
+      server.start();
+
+      XmlDataImporter importer = new XmlDataImporter();
+      importer.input = new File(getTestDir(), fileName).getAbsolutePath();
+      importer.execute(new ActionContext());
+
+      {
+         ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+         try (Connection connection = factory.createConnection()) {
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            connection.start();
+
+            Queue anycastJMSQueue = session.createQueue(undefinedPrefix + 
anycastServerQueue.getID());
+            Queue multicastJMSQueue = session.createQueue(undefinedPrefix + 
multiCastServerQueue.getID() + "::" + undefinedPrefix + 
multiCastServerQueue.getID());
+
+            try (MessageConsumer consumer = 
session.createConsumer(anycastJMSQueue)) {
+               for (int i = 0; i < numberOfMessages; i++) {
+                  TextMessage message = (TextMessage) consumer.receive(5000);
+                  assertNotNull(message);
+                  assertEquals("hello " + i, message.getText());
+               }
+            }
+
+            try (MessageConsumer consumer = 
session.createConsumer(multicastJMSQueue)) {
+               for (int i = 0; i < numberOfMessages; i++) {
+                  TextMessage message = (TextMessage) consumer.receive(5000);
+                  assertNotNull(message);
+                  assertEquals("hello " + i, message.getText());
+               }
+            }
+
+            session.commit();
+         }
+      }
+   }
+
+   private void 
removeAddressAndQueue(org.apache.activemq.artemis.core.server.Queue 
serverQueue) throws Exception {
+      AddressInfo addressInfo = 
server.getAddressInfo(serverQueue.getAddress());
+      long tx = server.getStorageManager().generateID();
+      server.getStorageManager().deleteAddressBinding(tx, addressInfo.getId());
+      server.getStorageManager().deleteQueueBinding(tx, serverQueue.getID());
+      server.getStorageManager().commitBindings(tx);
+   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to