This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8e4bc33dc4 ARTEMIS-5097 Allow export and import data of undefined
queues
8e4bc33dc4 is described below
commit 8e4bc33dc4d7b4d2031c1bda996c7ce002a3d0e8
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Oct 11 11:04:41 2024 -0400
ARTEMIS-5097 Allow export and import data of undefined queues
This will allow users eventually undoing mistakes after removing a queue by
accident.
---
.../cli/commands/tools/xml/XmlDataExporter.java | 38 ++++++-
.../cli/commands/tools/xml/XmlDataImporter.java | 17 ++-
.../api/core/management/ManagementHelper.java | 3 +
.../codec/PersistentQueueBindingEncoding.java | 4 +
.../persistence/XmlImportExportTest.java | 119 +++++++++++++++++++++
5 files changed, 178 insertions(+), 3 deletions(-)
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
index 174fbfff12..80bdffe757 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
@@ -80,6 +80,11 @@ public final class XmlDataExporter extends DBOption {
private XMLStreamWriter xmlWriter;
+ private Throwable lastError;
+
+ @Option(names = "undefined-prefix", description = "In case a queue does not
exist, this will define the prefix to be used on the message export. Default:
'UndefinedQueue_'")
+ private String undefinedPrefix = "UndefinedQueue_";
+
// an inner map of message refs hashed by the queue ID to which they belong
and then hashed by their record ID
private final Map<Long, HashMap<Long, ReferenceDescribe>> messageRefs = new
HashMap<>();
@@ -100,6 +105,15 @@ public final class XmlDataExporter extends DBOption {
XMLMessageExporter exporter;
+ public String getUndefinedPrefix() {
+ return undefinedPrefix;
+ }
+
+ public XmlDataExporter setUndefinedPrefix(String undefinedPrefix) {
+ this.undefinedPrefix = undefinedPrefix;
+ return this;
+ }
+
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
@@ -331,11 +345,16 @@ public final class XmlDataExporter extends DBOption {
xmlWriter.writeEndDocument();
xmlWriter.flush();
xmlWriter.close();
- } catch (Exception e) {
+ } catch (Throwable e) {
e.printStackTrace();
+ this.lastError = e;
}
}
+ public Throwable getLastError() {
+ return lastError;
+ }
+
private void printBindingsAsXML() throws XMLStreamException {
xmlWriter.writeStartElement(XmlDataConstants.BINDINGS_PARENT);
for (Map.Entry<Long, PersistentAddressBindingEncoding>
addressBindingEncodingEntry : addressBindings.entrySet()) {
@@ -475,7 +494,22 @@ public final class XmlDataExporter extends DBOption {
private List<String> extractQueueNames(HashMap<Long,
DescribeJournal.ReferenceDescribe> refMap) {
List<String> queues = new ArrayList<>();
for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
-
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
+ String queueName;
+
+ PersistentQueueBindingEncoding persistentQueueBindingEncoding =
queueBindings.get(ref.refEncoding.queueID);
+ if (persistentQueueBindingEncoding == null) {
+ PersistentQueueBindingEncoding undefinedQueue = new
PersistentQueueBindingEncoding();
+ undefinedQueue.setId(ref.refEncoding.queueID);
+ undefinedQueue.replaceQueueName(SimpleString.of(undefinedPrefix +
ref.refEncoding.queueID));
+ undefinedQueue.replaceAddress(undefinedQueue.getQueueName());
+ queueBindings.put(undefinedQueue.getId(), undefinedQueue);
+ queueName = String.valueOf(undefinedQueue.getQueueName());
+ getActionContext().err.println("Queue ID " +
ref.refEncoding.queueID + " not defined. Exporting it as " +
undefinedQueue.getQueueName());
+ } else {
+ queueName =
String.valueOf(persistentQueueBindingEncoding.getQueueName());
+ }
+
+ queues.add(queueName);
}
return queues;
}
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
index e76f6989b8..eac41a2696 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
@@ -288,8 +288,23 @@ public final class XmlDataImporter extends
ConnectionConfigurationAbtract {
}
}
+
+ private void createUndefinedQueue(String name, RoutingType routingType)
throws Exception {
+ ClientSession.QueueQuery queueQuery =
managementSession.queueQuery(SimpleString.of(name));
+ if (!queueQuery.isExists()) {
+
managementSession.createQueue(QueueConfiguration.of(name).setRoutingType(routingType).setDurable(true).setAutoCreateAddress(true));
+ }
+ }
+
+
private void sendMessage(List<String> queues, Message message) throws
Exception {
- final String destination = addressMap.get(queues.get(0));
+ String destination = addressMap.get(queues.get(0));
+ if (destination == null) {
+ createUndefinedQueue(queues.get(0), message.getRoutingType());
+ destination = queues.get(0);
+ addressMap.put(queues.get(0), queues.get(0));
+ }
+
final ByteBuffer buffer = ByteBuffer.allocate(queues.size() * 8);
final boolean debugLog = logger.isDebugEnabled();
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 88090c2ef9..8892a630f0 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -289,6 +289,9 @@ public final class ManagementHelper {
* Returns whether the invocation of the management operation on the server
resource succeeded.
*/
public static boolean hasOperationSucceeded(final Message message) {
+ if (message == null) {
+ return false;
+ }
if (!ManagementHelper.isOperationResult(message)) {
return false;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 71464a5141..5d5551b442 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -186,6 +186,10 @@ public class PersistentQueueBindingEncoding implements
EncodingSupport, QueueBin
this.name = newName;
}
+ public void replaceAddress(SimpleString address) {
+ this.address = address;
+ }
+
@Override
public SimpleString getFilterString() {
return filterString;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
index 9462b08c49..fac8bbc311 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java
@@ -27,11 +27,15 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.util.EnumSet;
import java.util.UUID;
@@ -48,6 +52,7 @@ import
org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
import
org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator;
@@ -55,11 +60,13 @@ import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.unit.util.InVMContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -1269,4 +1276,116 @@ public class XmlImportExportTest extends
ActiveMQTestBase {
locator.close();
server.stop();
}
+
+
+ @Test
+ public void testRemovedQueue() throws Exception {
+
+ String undefinedPrefix = "undef_" + RandomUtil.randomString() + "_";
+ final int numberOfMessages = 100;
+
+ server = createServer(true, true);
+ server.start();
+ forceLong();
+
+ String anycastQueueName = getTestClassName() + RandomUtil.randomString();
+ String multicastQueueName = getTestClassName() +
RandomUtil.randomString();
+ createAnycastPair(server, anycastQueueName);
+ server.addAddressInfo(new
AddressInfo(multicastQueueName).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of(multicastQueueName).setRoutingType(RoutingType.MULTICAST).setAddress(multicastQueueName));
+
+ org.apache.activemq.artemis.core.server.Queue anycastServerQueue =
server.locateQueue(anycastQueueName);
+ assertNotNull(anycastServerQueue);
+ assertEquals(RoutingType.ANYCAST, anycastServerQueue.getRoutingType());
+ org.apache.activemq.artemis.core.server.Queue multiCastServerQueue =
server.locateQueue(multicastQueueName);
+ assertNotNull(multiCastServerQueue);
+ assertEquals(RoutingType.MULTICAST,
multiCastServerQueue.getRoutingType());
+
+ {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(multicastQueueName);
+ Queue queue = session.createQueue(anycastQueueName);
+
+ try (MessageProducer producer = session.createProducer(queue)) {
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ }
+ }
+
+ try (MessageProducer producer = session.createProducer(topic)) {
+ for (int i = 0; i < numberOfMessages; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ }
+ }
+
+ session.commit();
+ }
+ }
+
+ // this is forcing a situation where the queue was removed and the
messages are still in the journal
+ removeAddressAndQueue(anycastServerQueue);
+ removeAddressAndQueue(multiCastServerQueue);
+
+ server.stop();
+
+ final String fileName = "test.out";
+
+ FileOutputStream fileOutputStream = new FileOutputStream(new
File(getTestDir(), fileName));
+ BufferedOutputStream bufferOut = new
BufferedOutputStream(fileOutputStream);
+ XmlDataExporter xmlDataExporter = new XmlDataExporter();
+
+ xmlDataExporter.setUndefinedPrefix(undefinedPrefix);
+
+ // the journal should still export even though the bindings don't exist
any more
+ // this is to "facilitate" users recovering or undoing mistakes
+ xmlDataExporter.process(bufferOut, getBindingsDir(), getJournalDir(),
getPageDir(), getLargeMessagesDir());
+ bufferOut.close();
+ assertNull(xmlDataExporter.getLastError());
+
+ server.start();
+
+ XmlDataImporter importer = new XmlDataImporter();
+ importer.input = new File(getTestDir(), fileName).getAbsolutePath();
+ importer.execute(new ActionContext());
+
+ {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ connection.start();
+
+ Queue anycastJMSQueue = session.createQueue(undefinedPrefix +
anycastServerQueue.getID());
+ Queue multicastJMSQueue = session.createQueue(undefinedPrefix +
multiCastServerQueue.getID() + "::" + undefinedPrefix +
multiCastServerQueue.getID());
+
+ try (MessageConsumer consumer =
session.createConsumer(anycastJMSQueue)) {
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals("hello " + i, message.getText());
+ }
+ }
+
+ try (MessageConsumer consumer =
session.createConsumer(multicastJMSQueue)) {
+ for (int i = 0; i < numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals("hello " + i, message.getText());
+ }
+ }
+
+ session.commit();
+ }
+ }
+ }
+
+ private void
removeAddressAndQueue(org.apache.activemq.artemis.core.server.Queue
serverQueue) throws Exception {
+ AddressInfo addressInfo =
server.getAddressInfo(serverQueue.getAddress());
+ long tx = server.getStorageManager().generateID();
+ server.getStorageManager().deleteAddressBinding(tx, addressInfo.getId());
+ server.getStorageManager().deleteQueueBinding(tx, serverQueue.getID());
+ server.getStorageManager().commitBindings(tx);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact