This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new aaa8627795 ARTEMIS-4211 Fix record message id for federated queue 
consumers
aaa8627795 is described below

commit aaa8627795c9b22434a86e00fd7c79289d374393
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Fri Mar 17 18:08:59 2023 +0100

    ARTEMIS-4211 Fix record message id for federated queue consumers
    
    The federated queue consumer has to generate a new id for the messages
    received from the upstream broker because they have an id generated by
    the store manager of the upstream broker.
    
    Co-authored-by: Clebert Suconic <[email protected]>
---
 .../federation/FederatedQueueConsumerImpl.java     |  1 +
 .../integration/federation/FederatedTestBase.java  | 23 +++++-
 .../federation/NettyFederatedQueueTest.java        | 94 ++++++++++++++++++++++
 3 files changed, 116 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index 3dbceba203..1c7b3f2748 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -202,6 +202,7 @@ public class FederatedQueueConsumerImpl implements 
FederatedQueueConsumer, Sessi
             }
          }
 
+         message = message.copy(server.getStorageManager().generateID());
          message = transformer == null ? message : 
transformer.transform(message);
          if (message != null) {
             server.getPostOffice().route(message, true);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
index 34e3763ee1..08c29f61e7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -37,6 +38,14 @@ public class FederatedTestBase extends ActiveMQTestBase {
    protected List<MBeanServer> mBeanServers = new ArrayList<>();
    protected List<ActiveMQServer> servers = new ArrayList<>();
 
+   protected boolean isNetty() {
+      return false;
+   }
+
+   protected boolean isPersistenceEnabled() {
+      return false;
+   }
+
 
    @Override
    @Before
@@ -47,9 +56,19 @@ public class FederatedTestBase extends ActiveMQTestBase {
          mBeanServers.add(mBeanServer);
          Configuration config = createDefaultConfig(i, 
false).setSecurityEnabled(false);
          for (int j = 0; j < numberOfServers(); j++) {
-            config.addConnectorConfiguration("server" + j, "vm://" + j);
+            if (isNetty()) {
+               config.addConnectorConfiguration("server" + j, 
"tcp://localhost:" + (61616 + j) + "?ackBatchSize=1;consumerWindowSize=-1");
+            } else {
+               config.addConnectorConfiguration("server" + j, "vm://" + j);
+            }
          }
-         ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, false));
+
+         if (isNetty()) {
+            TransportConfiguration acceptorConfig = 
createTransportConfiguration(true, true, generateParams(i, true));
+            config.addAcceptorConfiguration(acceptorConfig);
+         }
+
+         ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, 
isPersistenceEnabled()));
 
          servers.add(server);
          server.start();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.java
new file mode 100644
index 0000000000..a1144dc9a0
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NettyFederatedQueueTest extends FederatedTestBase {
+
+   @Override
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Override
+   protected boolean isPersistenceEnabled() {
+      return true;
+   }
+
+   @Test
+   public void testFederatedQueueBiDirectionalUpstream() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+
+      String queueName = getName();
+      FederationConfiguration federationConfiguration0 = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", 
queueName);
+      
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
+      getServer(0).getFederationManager().deploy();
+
+      FederationConfiguration federationConfiguration1 = 
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server0", 
queueName);
+      
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1);
+      getServer(1).getFederationManager().deploy();
+
+      ConnectionFactory cf1 = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616?consumerWindowSize=0");
+      ConnectionFactory cf2 = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61617?consumerWindowSize=0");
+
+      Connection connection1 = cf1.createConnection();
+      connection1.start();
+      runAfter(connection1::close);
+      Session session1 = connection1.createSession(true, 
Session.SESSION_TRANSACTED);
+      MessageProducer producer1 = 
session1.createProducer(session1.createQueue(queueName));
+
+      Connection connection2 = cf2.createConnection();
+      connection2.start();
+      runAfter(connection2::close);
+      Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer2 = 
session2.createConsumer(session2.createQueue(queueName));
+
+      producer1.send(session1.createTextMessage("Test"));
+      session1.commit();
+
+      Assert.assertNotNull(consumer2.receive(5000));
+
+      for (int i = 0; i < 1000; i++) {
+         producer1.send(session1.createTextMessage("test"));
+      }
+      session1.commit();
+
+      final MessageConsumer consumer1 = 
session1.createConsumer(session1.createQueue(queueName));
+
+      for (int i = 0; i < 100; i++) {
+         Assert.assertNotNull(consumer1.receive(5000));
+         session1.commit();
+         Assert.assertNotNull(consumer2.receive(5000));
+      }
+
+      Assert.assertNotNull(consumer2.receive(5000));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222153"));
+   }
+}

Reply via email to