This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new aaa8627795 ARTEMIS-4211 Fix record message id for federated queue
consumers
aaa8627795 is described below
commit aaa8627795c9b22434a86e00fd7c79289d374393
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Fri Mar 17 18:08:59 2023 +0100
ARTEMIS-4211 Fix record message id for federated queue consumers
The federated queue consumer has to generate a new id for the messages
received from the upstream broker because they have an id generated by
the store manager of the upstream broker.
Co-authored-by: Clebert Suconic <[email protected]>
---
.../federation/FederatedQueueConsumerImpl.java | 1 +
.../integration/federation/FederatedTestBase.java | 23 +++++-
.../federation/NettyFederatedQueueTest.java | 94 ++++++++++++++++++++++
3 files changed, 116 insertions(+), 2 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
index 3dbceba203..1c7b3f2748 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
@@ -202,6 +202,7 @@ public class FederatedQueueConsumerImpl implements
FederatedQueueConsumer, Sessi
}
}
+ message = message.copy(server.getStorageManager().generateID());
message = transformer == null ? message :
transformer.transform(message);
if (message != null) {
server.getPostOffice().route(message, true);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
index 34e3763ee1..08c29f61e7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/FederatedTestBase.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -37,6 +38,14 @@ public class FederatedTestBase extends ActiveMQTestBase {
protected List<MBeanServer> mBeanServers = new ArrayList<>();
protected List<ActiveMQServer> servers = new ArrayList<>();
+ protected boolean isNetty() {
+ return false;
+ }
+
+ protected boolean isPersistenceEnabled() {
+ return false;
+ }
+
@Override
@Before
@@ -47,9 +56,19 @@ public class FederatedTestBase extends ActiveMQTestBase {
mBeanServers.add(mBeanServer);
Configuration config = createDefaultConfig(i,
false).setSecurityEnabled(false);
for (int j = 0; j < numberOfServers(); j++) {
- config.addConnectorConfiguration("server" + j, "vm://" + j);
+ if (isNetty()) {
+ config.addConnectorConfiguration("server" + j,
"tcp://localhost:" + (61616 + j) + "?ackBatchSize=1;consumerWindowSize=-1");
+ } else {
+ config.addConnectorConfiguration("server" + j, "vm://" + j);
+ }
}
- ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer, false));
+
+ if (isNetty()) {
+ TransportConfiguration acceptorConfig =
createTransportConfiguration(true, true, generateParams(i, true));
+ config.addAcceptorConfiguration(acceptorConfig);
+ }
+
+ ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(config, mBeanServer,
isPersistenceEnabled()));
servers.add(server);
server.start();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.java
new file mode 100644
index 0000000000..a1144dc9a0
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/federation/NettyFederatedQueueTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.federation;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.config.FederationConfiguration;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class NettyFederatedQueueTest extends FederatedTestBase {
+
+ @Override
+ protected boolean isNetty() {
+ return true;
+ }
+
+ @Override
+ protected boolean isPersistenceEnabled() {
+ return true;
+ }
+
+ @Test
+ public void testFederatedQueueBiDirectionalUpstream() throws Exception {
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+
+ String queueName = getName();
+ FederationConfiguration federationConfiguration0 =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1",
queueName);
+
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration0);
+ getServer(0).getFederationManager().deploy();
+
+ FederationConfiguration federationConfiguration1 =
FederatedTestUtil.createQueueUpstreamFederationConfiguration("server0",
queueName);
+
getServer(1).getConfiguration().getFederationConfigurations().add(federationConfiguration1);
+ getServer(1).getFederationManager().deploy();
+
+ ConnectionFactory cf1 = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616?consumerWindowSize=0");
+ ConnectionFactory cf2 = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61617?consumerWindowSize=0");
+
+ Connection connection1 = cf1.createConnection();
+ connection1.start();
+ runAfter(connection1::close);
+ Session session1 = connection1.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer1 =
session1.createProducer(session1.createQueue(queueName));
+
+ Connection connection2 = cf2.createConnection();
+ connection2.start();
+ runAfter(connection2::close);
+ Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer2 =
session2.createConsumer(session2.createQueue(queueName));
+
+ producer1.send(session1.createTextMessage("Test"));
+ session1.commit();
+
+ Assert.assertNotNull(consumer2.receive(5000));
+
+ for (int i = 0; i < 1000; i++) {
+ producer1.send(session1.createTextMessage("test"));
+ }
+ session1.commit();
+
+ final MessageConsumer consumer1 =
session1.createConsumer(session1.createQueue(queueName));
+
+ for (int i = 0; i < 100; i++) {
+ Assert.assertNotNull(consumer1.receive(5000));
+ session1.commit();
+ Assert.assertNotNull(consumer2.receive(5000));
+ }
+
+ Assert.assertNotNull(consumer2.receive(5000));
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222153"));
+ }
+}