This is an automated email from the ASF dual-hosted git repository.

gemmellr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git

commit ed9d372d23c018cef0036089355f01705e16cb53
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Tue Apr 28 11:51:04 2026 +0200

    ARTEMIS-6033 Close the connection factory when the CLI transfer command ends
---
 .../artemis/cli/commands/messages/Transfer.java    | 182 ++++++++++-----------
 1 file changed, 91 insertions(+), 91 deletions(-)

diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java
index ea543d9207..efdcaeecf9 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Transfer.java
@@ -342,108 +342,108 @@ public class Transfer extends InputAbstract {
    }
 
    private int doTransfer(ActionContext context) throws Exception {
-      ConnectionFactoryClosable sourceConnectionFactory = 
createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, 
sourcePassword, sourceClientID);
-      Connection sourceConnection = sourceConnectionFactory.createConnection();
-
-      Session sourceSession = 
sourceConnection.createSession(Session.SESSION_TRANSACTED);
-      Destination sourceDestination = createDestination("source", 
sourceSession, sourceQueue, sourceTopic);
-      MessageConsumer consumer = null;
-      if (sourceDestination instanceof Queue) {
-         if (filter != null) {
-            consumer = sourceSession.createConsumer(sourceDestination, filter);
-         } else {
-            consumer = sourceSession.createConsumer(sourceDestination);
-         }
-      } else if (sourceDestination instanceof Topic topic) {
+      try (ConnectionFactoryClosable sourceConnectionFactory = 
createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, 
sourcePassword, sourceClientID);
+           Connection sourceConnection = 
sourceConnectionFactory.createConnection();
+           Session sourceSession = 
sourceConnection.createSession(Session.SESSION_TRANSACTED)) {
 
-         if (durableConsumer != null) {
-            if (filter != null) {
-               consumer = sourceSession.createDurableConsumer(topic, 
durableConsumer);
-            } else {
-               consumer = sourceSession.createDurableConsumer(topic, 
durableConsumer, filter, noLocal);
-            }
-         } else if (sharedDurableSubscription != null) {
+         Destination sourceDestination = createDestination("source", 
sourceSession, sourceQueue, sourceTopic);
+         MessageConsumer consumer = null;
+         if (sourceDestination instanceof Queue) {
             if (filter != null) {
-               consumer = sourceSession.createSharedDurableConsumer(topic, 
sharedDurableSubscription, filter);
+               consumer = sourceSession.createConsumer(sourceDestination, 
filter);
             } else {
-               consumer = sourceSession.createSharedDurableConsumer(topic, 
sharedDurableSubscription);
+               consumer = sourceSession.createConsumer(sourceDestination);
             }
-         } else if (sharedSubscription != null) {
-            if (filter != null) {
-               consumer = sourceSession.createSharedConsumer(topic, 
sharedSubscription, filter);
+         } else if (sourceDestination instanceof Topic topic) {
+
+            if (durableConsumer != null) {
+               if (filter != null) {
+                  consumer = sourceSession.createDurableConsumer(topic, 
durableConsumer);
+               } else {
+                  consumer = sourceSession.createDurableConsumer(topic, 
durableConsumer, filter, noLocal);
+               }
+            } else if (sharedDurableSubscription != null) {
+               if (filter != null) {
+                  consumer = sourceSession.createSharedDurableConsumer(topic, 
sharedDurableSubscription, filter);
+               } else {
+                  consumer = sourceSession.createSharedDurableConsumer(topic, 
sharedDurableSubscription);
+               }
+            } else if (sharedSubscription != null) {
+               if (filter != null) {
+                  consumer = sourceSession.createSharedConsumer(topic, 
sharedSubscription, filter);
+               } else {
+                  consumer = sourceSession.createSharedConsumer(topic, 
sharedSubscription);
+               }
             } else {
-               consumer = sourceSession.createSharedConsumer(topic, 
sharedSubscription);
+               throw new IllegalArgumentException("you must specify either 
--durable-consumer, --shared-durable-subscription or --shared-subscription with 
a JMS topic");
             }
-         } else {
-            throw new IllegalArgumentException("you must specify either 
--durable-consumer, --shared-durable-subscription or --shared-subscription with 
a JMS topic");
-         }
-      }
-
-      ConnectionFactoryClosable targetConnectionFactory = 
createConnectionFactory("target", targetProtocol, targetURL, targetUser, 
targetPassword, null);
-      Connection targetConnection = targetConnectionFactory.createConnection();
-      Session targetSession = 
targetConnection.createSession(Session.SESSION_TRANSACTED);
-      Destination targetDestination = createDestination("target", 
targetSession, targetQueue, targetTopic);
-      MessageProducer producer = 
targetSession.createProducer(targetDestination);
-
-      if (sourceURL.equals(targetURL) && 
sourceDestination.equals(targetDestination)) {
-         context.out.println("You cannot transfer between " + sourceURL + "/" 
+ sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + 
"That would create an infinite recursion.");
-         throw new IllegalArgumentException("cannot use " + sourceDestination 
+ " == " + targetDestination);
-      }
-
-      sourceConnection.start();
-      int pending = 0, total = 0;
-      while (total < messageCount) {
-
-         Message receivedMessage;
-         if (receiveTimeout < 0) {
-            receivedMessage = consumer.receive();
-         } else if (receiveTimeout == 0) {
-            receivedMessage = consumer.receiveNoWait();
-         } else {
-            receivedMessage = consumer.receive(receiveTimeout);
          }
 
-         if (receivedMessage == null) {
-            if (isVerbose()) {
-               context.out.println("could not receive any more messages");
-            }
-            break;
-         }
-         producer.send(receivedMessage);
-         pending++;
-         total++;
-
-         if (isVerbose()) {
-            context.out.println("Received message " + total + " with " + 
pending + " messages pending to be commited");
-         }
-         if (pending > commitInterval) {
-            context.out.println("Transferred " + pending + " messages of " + 
total);
-            pending = 0;
-            targetSession.commit();
-            if (!isCopy()) {
-               sourceSession.commit();
+         try (MessageConsumer sourceConsumer = consumer;
+              ConnectionFactoryClosable targetConnectionFactory = 
createConnectionFactory("target", targetProtocol, targetURL, targetUser, 
targetPassword, null);
+              Connection targetConnection = 
targetConnectionFactory.createConnection();
+              Session targetSession = 
targetConnection.createSession(Session.SESSION_TRANSACTED)) {
+
+            Destination targetDestination = createDestination("target", 
targetSession, targetQueue, targetTopic);
+
+            try (MessageProducer producer = 
targetSession.createProducer(targetDestination)) {
+
+               if (sourceURL.equals(targetURL) && 
sourceDestination.equals(targetDestination)) {
+                  context.out.println("You cannot transfer between " + 
sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + 
targetDestination + ".\n" + "That would create an infinite recursion.");
+                  throw new IllegalArgumentException("cannot use " + 
sourceDestination + " == " + targetDestination);
+               }
+
+               sourceConnection.start();
+               int pending = 0, total = 0;
+               while (total < messageCount) {
+
+                  Message receivedMessage;
+                  if (receiveTimeout < 0) {
+                     receivedMessage = sourceConsumer.receive();
+                  } else if (receiveTimeout == 0) {
+                     receivedMessage = sourceConsumer.receiveNoWait();
+                  } else {
+                     receivedMessage = sourceConsumer.receive(receiveTimeout);
+                  }
+
+                  if (receivedMessage == null) {
+                     if (isVerbose()) {
+                        context.out.println("could not receive any more 
messages");
+                     }
+                     break;
+                  }
+                  producer.send(receivedMessage);
+                  pending++;
+                  total++;
+
+                  if (isVerbose()) {
+                     context.out.println("Received message " + total + " with 
" + pending + " messages pending to be commited");
+                  }
+                  if (pending > commitInterval) {
+                     context.out.println("Transferred " + pending + " messages 
of " + total);
+                     pending = 0;
+                     targetSession.commit();
+                     if (!isCopy()) {
+                        sourceSession.commit();
+                     }
+                  }
+               }
+
+               context.out.println("Transferred a total of " + total + " 
messages");
+
+               if (pending != 0) {
+                  targetSession.commit();
+                  if (isCopy()) {
+                     sourceSession.rollback();
+                  } else {
+                     sourceSession.commit();
+                  }
+               }
+
+               return total;
             }
          }
       }
-
-      context.out.println("Transferred a total of " + total + " messages");
-
-      if (pending != 0) {
-         targetSession.commit();
-         if (isCopy()) {
-            sourceSession.rollback();
-         } else {
-            sourceSession.commit();
-         }
-      }
-
-      sourceConnection.close();
-      sourceConnectionFactory.close();
-
-      targetConnection.close();
-      targetConnectionFactory.close();
-
-      return total;
    }
 
    Destination createDestination(String role, Session session, String queue, 
String topic) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to