This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new c996888f43 ARTEMIS-5634 Mirror source and routing should use a TX
c996888f43 is described below

commit c996888f43575ce69e4b2148694eefa72b7926fd
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Thu Aug 21 22:30:56 2025 -0400

    ARTEMIS-5634 Mirror source and routing should use a TX
    
    if a non transaction send happens while on mirror,
    the broker should upgrade the operation to an internal transaction
    to make sure the whole thing happens atomically.
---
 .../core/postoffice/impl/PostOfficeImpl.java       |  14 +-
 .../amqp/connect/StopDuringMirrorTest.java         | 216 +++++++++++++++++++++
 2 files changed, 229 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index ee8138ca02..17a055bc39 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1659,7 +1659,15 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
                             final boolean direct) throws Exception {
       final ArrayList<MessageReference> refs = new ArrayList<>();
 
-      final Transaction tx = context.getTransaction();
+      Transaction tx = context.getTransaction();
+      boolean startedTX = false;
+
+      if (tx == null && mirrorControllerSource != null) {
+         context.setReusable(false);
+         tx = new TransactionImpl(storageManager).setAsync(true);
+         context.setTransaction(tx);
+         startedTX = true;
+      }
 
       final Long deliveryTime;
 
@@ -1732,6 +1740,10 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
             }
          });
       }
+
+      if (startedTX) {
+         tx.commit();
+      }
    }
 
    public static void processReferences(List<MessageReference> refs, boolean 
direct) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
new file mode 100644
index 0000000000..f4df4e7bf7
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/StopDuringMirrorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StopDuringMirrorTest extends ActiveMQTestBase {
+
+
+   private static final int EXIT_AS_EXPECTED = 7;
+   private static final int ERROR = 11;
+
+   private static String QUEUE_NAME = "StopDuringMirror";
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final int NUMBER_OF_MESSAGES = 5;
+
+   ActiveMQServer server;
+
+   ExecutorService executorService;
+
+   public static void main(String[] arg) {
+      try {
+         StopDuringMirrorTest stopDuringMirrorTest = new 
StopDuringMirrorTest();
+         stopDuringMirrorTest.spawnedRun(arg[0]);
+         for (;;) {
+            Thread.sleep(1000);
+         }
+      } catch (Throwable e) {
+         e.printStackTrace();
+         System.exit(ERROR);
+      }
+   }
+
+
+   public void spawnedRun(String temporaryFolder) throws Exception {
+      this.temporaryFolder = new File(temporaryFolder);
+
+      ActiveMQServer server = createServer();
+      server.start();
+      MirrorController controller = 
server.getPostOffice().getMirrorControlSource();
+      assertNotNull(controller);
+      server.getPostOffice().setMirrorControlSource(new 
KillServerController(controller));
+      this.server = server;
+   }
+
+   @Test
+   public void testStopDuringRoute() throws Exception {
+      Process process = 
SpawnedVMSupport.spawnVM(StopDuringMirrorTest.class.getName(), 
temporaryFolder.getAbsolutePath());
+      runAfter(process::destroyForcibly);
+      ServerUtil.waitForServerToStart(0, (int)TimeUnit.SECONDS.toMillis(20));
+
+      executorService = Executors.newSingleThreadExecutor();
+      runAfter(executorService::shutdown);
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:61616");
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+         producer.send(session.createTextMessage("hello"));
+      } catch (Throwable expected) {
+         logger.info(expected.getMessage(), expected);
+      }
+
+      assertTrue(process.waitFor(5, TimeUnit.MINUTES));
+
+      assertEquals(EXIT_AS_EXPECTED, process.exitValue());
+
+      server = createServer();
+      server.start();
+
+      Queue queue = server.locateQueue(QUEUE_NAME);
+
+      assertEquals(0, queue.getMessageCount());
+
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         connection.start();
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+         assertNull(consumer.receiveNoWait());
+      } catch (Exception expected) {
+         logger.info(expected.getMessage(), expected);
+      }
+
+      assertEquals(0, queue.getMessageCount());
+   }
+
+   private ActiveMQServer createServer() throws Exception {
+      Configuration configuration = createDefaultConfig(0, false);
+      configuration.getAddressConfigurations().clear();
+      configuration.setResolveProtocols(true);
+      
configuration.setMirrorAckManagerRetryDelay(100).setMirrorAckManagerPageAttempts(5).setMirrorAckManagerQueueAttempts(5);
+      
configuration.addQueueConfiguration(QueueConfiguration.of(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
+      configuration.addAcceptorConfiguration("clients", 
"tcp://localhost:61616");
+      AMQPBrokerConnectConfiguration brokerConnectConfiguration = new 
AMQPBrokerConnectConfiguration("willNeverConnect", 
"tcp://localhost:61617").setRetryInterval(60_000).setReconnectAttempts(-1);
+      AMQPMirrorBrokerConnectionElement mirror = new 
AMQPMirrorBrokerConnectionElement().setDurable(true);
+      brokerConnectConfiguration.addMirror(mirror);
+      configuration.addAMQPConnection(brokerConnectConfiguration);
+      org.apache.activemq.artemis.core.server.ActiveMQServer server = 
createServer(true, configuration);
+      server.setIdentity("server1");
+      return server;
+   }
+
+   class KillServerController implements MirrorController {
+      MirrorController target;
+
+      KillServerController(MirrorController target) {
+         this.target = target;
+      }
+
+      @Override
+      public boolean isRetryACK() {
+         return target.isRetryACK();
+      }
+
+      @Override
+      public void addAddress(AddressInfo addressInfo) throws Exception {
+         target.addAddress(addressInfo);
+      }
+
+      @Override
+      public void deleteAddress(AddressInfo addressInfo) throws Exception {
+         target.deleteAddress(addressInfo);
+      }
+
+      @Override
+      public void createQueue(QueueConfiguration queueConfiguration) throws 
Exception {
+         target.createQueue(queueConfiguration);
+      }
+
+      @Override
+      public void deleteQueue(SimpleString addressName, SimpleString 
queueName) throws Exception {
+         target.deleteQueue(addressName, queueName);
+      }
+
+      @Override
+      public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
+         try {
+            System.out.println("..... send message" + message);
+            server.getStorageManager().getContext().waitCompletion(5000);
+            // to make the client to exit earlier
+            server.getRemotingService().stop(false);
+            // this is exiting a spawned server
+            System.exit(EXIT_AS_EXPECTED);
+         } catch (Exception e) {
+         }
+      }
+
+      @Override
+      public void postAcknowledge(MessageReference ref, AckReason reason) 
throws Exception {
+         target.postAcknowledge(ref, reason);
+      }
+
+      @Override
+      public void preAcknowledge(Transaction tx, MessageReference ref, 
AckReason reason) throws Exception {
+         target.preAcknowledge(tx, ref, reason);
+      }
+
+      @Override
+      public String getRemoteMirrorId() {
+         return target.getRemoteMirrorId();
+      }
+   }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to