This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 0ce9b67d801a28cef661aca1a60eef2f6a81fd25
Author: gtully <[email protected]>
AuthorDate: Tue Oct 19 11:19:36 2021 +0100

    ARTEMIS-3308 - support federation of large messages
    
    (cherry picked from commit cf85d35355a4c9965a1916066bfdde821c8a84bb)
---
 .../impl/CompressedLargeMessageControllerImpl.java |  5 ++++
 .../core/client/impl/LargeMessageController.java   |  1 +
 .../client/impl/LargeMessageControllerImpl.java    |  7 +++++-
 .../artemis/core/server/ActiveMQServerLogger.java  |  5 ++++
 .../federation/FederatedQueueConsumerImpl.java     | 26 ++++++++++++++++++-
 .../integration/federation/FederatedQueueTest.java | 29 +++++++++++++++++++++-
 6 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
index 58c3511..a691387 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/CompressedLargeMessageControllerImpl.java
@@ -89,6 +89,11 @@ final class CompressedLargeMessageControllerImpl implements 
LargeMessageControll
    }
 
    @Override
+   public LargeMessageControllerImpl.LargeData take() throws 
InterruptedException {
+      return bufferDelegate.take();
+   }
+
+   @Override
    public int capacity() {
       return -1;
    }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
index 165a4d6..0565c83 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageController.java
@@ -61,4 +61,5 @@ public interface LargeMessageController extends 
ActiveMQBuffer {
     */
    boolean waitCompletion(long timeWait) throws ActiveMQException;
 
+   LargeMessageControllerImpl.LargeData take() throws InterruptedException;
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
index f91878d..be2bb78 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java
@@ -318,6 +318,11 @@ public class LargeMessageControllerImpl implements 
LargeMessageController {
 
    }
 
+   @Override
+   public LargeData take() throws InterruptedException {
+      return largeMessageData.take();
+   }
+
    /**
     * @throws ActiveMQException
     */
@@ -1328,7 +1333,7 @@ public class LargeMessageControllerImpl implements 
LargeMessageController {
       throw new 
IllegalAccessError(LargeMessageControllerImpl.READ_ONLY_ERROR_MESSAGE);
    }
 
-   private static class LargeData {
+   public static class LargeData {
 
       final byte[] chunk;
       final int flowControlSize;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index d10016b..ac75bec 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1763,6 +1763,11 @@ public interface ActiveMQServerLogger extends 
BasicLogger {
    @Message(id = 222304, value = "Unable to load message from journal", format 
= Message.Format.MESSAGE_FORMAT)
    void unableToLoadMessageFromJournal(@Cause Throwable t);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222305, value = "Error federating message {0}.",
+      format = Message.Format.MESSAGE_FORMAT)
+   void federationDispatchError(@Cause Throwable e, String message);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = 
Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index 0f408a1..c060427 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -28,12 +28,17 @@ import 
org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.jboss.logging.Logger;
 
+import static 
org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.LargeData;
+
 public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, 
SessionFailureListener {
 
    private static final Logger logger = 
Logger.getLogger(FederatedQueueConsumerImpl.class);
@@ -174,6 +179,24 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
    @Override
    public void onMessage(ClientMessage clientMessage) {
       try {
+         Message message = clientMessage;
+         if (message instanceof ClientLargeMessageInternal) {
+
+            final StorageManager storageManager = server.getStorageManager();
+            LargeServerMessage lsm = 
storageManager.createLargeMessage(storageManager.generateID(), message);
+
+            LargeData largeData = null;
+            do {
+               // block on reading all pending chunks, ok as we are called 
from an executor
+               largeData = ((ClientLargeMessageInternal) 
clientMessage).getLargeMessageController().take();
+               lsm.addBytes(largeData.getChunk());
+            }
+            while (largeData.isContinues());
+
+            message = lsm.toMessage();
+            lsm.releaseResources(true, true);
+         }
+
          if (server.hasBrokerFederationPlugins()) {
             try {
                server.callBrokerFederationPlugins(plugin -> 
plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));
@@ -183,7 +206,7 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
             }
          }
 
-         Message message = transformer == null ? clientMessage : 
transformer.transform(clientMessage);
+         message = transformer == null ? message : 
transformer.transform(message);
          if (message != null) {
             server.getPostOffice().route(message, true);
          }
@@ -198,6 +221,7 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
             }
          }
       } catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.federationDispatchError(e, 
clientMessage.toString());
          try {
             clientSession.rollback();
          } catch (ActiveMQException e1) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
index 1b5b814..6acbc77 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.java
@@ -61,7 +61,7 @@ public class FederatedQueueTest extends FederatedTestBase {
 
    @Override
    protected void configureQueues(ActiveMQServer server) throws Exception {
-      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(-1));
       createSimpleQueue(server, getName());
    }
 
@@ -244,6 +244,33 @@ public class FederatedQueueTest extends FederatedTestBase {
    }
 
    @Test
+   public void testWithLargeMessage() throws Exception {
+      String queueName = getName();
+
+      FederationConfiguration federationConfiguration = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", 
queueName);
+      
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
+      getServer(0).getFederationManager().deploy();
+
+      ConnectionFactory cf1 = getCF(1);
+      ConnectionFactory cf0 = getCF(0);
+      final String payload = new String(new byte[1 * 1024 * 
1024]).replace('\0','+');
+      try (Connection connection1 = cf1.createConnection(); Connection 
connection0 = cf0.createConnection()) {
+         connection1.start();
+         Session session1 = connection1.createSession();
+         Queue queue1 = session1.createQueue(queueName);
+         MessageProducer producer = session1.createProducer(queue1);
+         producer.send(session1.createTextMessage(payload));
+
+         connection0.start();
+         Session session0 = connection0.createSession();
+         Queue queue0 = session0.createQueue(queueName);
+         MessageConsumer consumer0 = session0.createConsumer(queue0);
+
+         assertNotNull(consumer0.receive(60000));
+      }
+   }
+
+   @Test
    public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() 
throws Exception {
       String queueName = getName();
       ConnectionFactory cf0 = getCF(0);

Reply via email to