This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 15c2dbb ARTEMIS-3521 Option to disable duplicate detection on
openwire failover clients
15c2dbb is described below
commit 15c2dbb1c36fe4a41467774f448ab45bf85a6bb7
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Oct 6 16:03:48 2021 -0400
ARTEMIS-3521 Option to disable duplicate detection on openwire failover
clients
---
.../protocol/openwire/OpenWireProtocolManager.java | 13 +++
.../core/protocol/openwire/amq/AMQSession.java | 7 +-
.../impl/journal/OperationContextUnitTest.java | 8 ++
.../openwire/FailoverDuplicateIDUsageTest.java | 112 +++++++++++++++++++++
4 files changed, 139 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 852f4dc..0ce2093 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -120,6 +120,8 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
private boolean updateClusterClients = false;
private boolean updateClusterClientsOnRemove = false;
+ private boolean openwireUseDuplicateDetectionOnFailover = true;
+
//http://activemq.apache.org/activemq-inactivitymonitor.html
private long maxInactivityDuration = 30 * 1000L;
private long maxInactivityDurationInitalDelay = 10 * 1000L;
@@ -192,6 +194,17 @@ public class OpenWireProtocolManager extends
AbstractProtocolManager<Command, O
redirectHandler = new OpenWireRedirectHandler(server, this);
}
+ /** Is Duplicate detection enabled when used with failover clients. */
+ public boolean isOpenwireUseDuplicateDetectionOnFailover() {
+ return openwireUseDuplicateDetectionOnFailover;
+ }
+
+ /** should it use duplicate detection on failover clients. */
+ public OpenWireProtocolManager
setOpenwireUseDuplicateDetectionOnFailover(boolean
openwireUseDuplicateDetectionOnFailover) {
+ this.openwireUseDuplicateDetectionOnFailover =
openwireUseDuplicateDetectionOnFailover;
+ return this;
+ }
+
@Override
public void nodeUP(TopologyMember member, boolean last) {
if (topologyMap.put(member.getNodeId(), member) == null) {
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 954edbc..e3e0b0e 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -364,6 +364,10 @@ public class AMQSession implements SessionCallback {
connection.disconnect(forcePossibleFailoverReconnect.getMessage(), true);
}
+ private static boolean isTemporary(ProducerInfo producerInfo) {
+ return producerInfo != null && producerInfo.getDestination() != null &&
producerInfo.getDestination().isTemporary();
+ }
+
public void send(final ProducerInfo producerInfo,
final Message messageSend,
final boolean sendProducerAck) throws Exception {
@@ -391,7 +395,8 @@ public class AMQSession implements SessionCallback {
* not receive acks will be resent. (ActiveMQ broker handles this by
returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache.
To do this we check to see if the
* message comes from failover connection. If so we add a DUPLICATE_ID
to handle duplicates after a resend. */
- if (connection.getContext().isFaultTolerant() &&
!messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString()))
{
+
+ if (connection.getContext().isFaultTolerant() &&
protocolManager.isOpenwireUseDuplicateDetectionOnFailover() &&
!messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString())
&& !isTemporary(producerInfo)) {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID,
SimpleString.toSimpleString(messageSend.getMessageId().toString()));
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
index 8d2cdfa..830d159 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
@@ -234,6 +234,14 @@ public class OperationContextUnitTest extends
ActiveMQTestBase {
assertTrue(latch1.await(10, TimeUnit.SECONDS));
assertTrue(latch2.await(10, TimeUnit.SECONDS));
+ if (impl.storeOnlyTasks != null) {
+ Assert.assertEquals(0, impl.storeOnlyTasks.size());
+ }
+
+ if (impl.tasks != null) {
+ Assert.assertEquals(0, impl.tasks.size());
+ }
+
} finally {
executor.shutdown();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FailoverDuplicateIDUsageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FailoverDuplicateIDUsageTest.java
new file mode 100644
index 0000000..3231256
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FailoverDuplicateIDUsageTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FailoverDuplicateIDUsageTest extends ActiveMQTestBase {
+
+ ActiveMQServer server;
+
+
+ @Before
+ public void setupServer() throws Exception {
+ server = createServer(true, true);
+ }
+
+
+ @Test
+ public void testTempQueue() throws Exception {
+ server.getConfiguration().getAcceptorConfigurations().clear();
+
server.getConfiguration().clearAcceptorConfigurations().addAcceptorConfiguration("openwire",
"tcp://localhost:61616?openwireUseDuplicateDetectionOnFailover=true");
+ server.start();
+ server.waitForActivation(10, TimeUnit.SECONDS);
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createTemporaryQueue();
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 100; i++) {
+ producer.send(session.createTextMessage("hello"));
+ }
+ Assert.assertEquals(0,
countDuplicateDetection(server.getConfiguration()));
+ }
+
+
+ }
+
+ @Test
+ public void testNoDuplicate() throws Exception {
+ testDuplicate(false);
+ }
+
+ @Test
+ public void testDuplicate() throws Exception {
+ testDuplicate(true);
+ }
+
+ private void testDuplicate(boolean useDuplicate) throws Exception {
+ String queueName = getName();
+ server.getConfiguration().getAcceptorConfigurations().clear();
+
server.getConfiguration().clearAcceptorConfigurations().addAcceptorConfiguration("openwire",
"tcp://localhost:61616?openwireUseDuplicateDetectionOnFailover=" +
useDuplicate);
+ server.start();
+ server.waitForActivation(10, TimeUnit.SECONDS);
+ server.createQueue(new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 100; i++) {
+ producer.send(session.createTextMessage("hello"));
+ }
+ }
+ server.stop();
+
+ Assert.assertEquals(useDuplicate ? 100 : 0,
countDuplicateDetection(server.getConfiguration()));
+
+ }
+
+ private int countDuplicateDetection(Configuration configuration) throws
Exception {
+ HashMap<Integer, AtomicInteger> maps = countJournal(configuration);
+ AtomicInteger value = maps.get((int)JournalRecordIds.DUPLICATE_ID);
+ return value == null ? 0 : value.get();
+ }
+
+
+}