This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 15c2dbb  ARTEMIS-3521 Option to disable duplicate detection on 
openwire failover clients
15c2dbb is described below

commit 15c2dbb1c36fe4a41467774f448ab45bf85a6bb7
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Oct 6 16:03:48 2021 -0400

    ARTEMIS-3521 Option to disable duplicate detection on openwire failover 
clients
---
 .../protocol/openwire/OpenWireProtocolManager.java |  13 +++
 .../core/protocol/openwire/amq/AMQSession.java     |   7 +-
 .../impl/journal/OperationContextUnitTest.java     |   8 ++
 .../openwire/FailoverDuplicateIDUsageTest.java     | 112 +++++++++++++++++++++
 4 files changed, 139 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 852f4dc..0ce2093 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -120,6 +120,8 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
    private boolean updateClusterClients = false;
    private boolean updateClusterClientsOnRemove = false;
 
+   private boolean openwireUseDuplicateDetectionOnFailover = true;
+
    //http://activemq.apache.org/activemq-inactivitymonitor.html
    private long maxInactivityDuration = 30 * 1000L;
    private long maxInactivityDurationInitalDelay = 10 * 1000L;
@@ -192,6 +194,17 @@ public class OpenWireProtocolManager  extends 
AbstractProtocolManager<Command, O
       redirectHandler = new OpenWireRedirectHandler(server, this);
    }
 
+   /** Is Duplicate detection enabled when used with failover clients. */
+   public boolean isOpenwireUseDuplicateDetectionOnFailover() {
+      return openwireUseDuplicateDetectionOnFailover;
+   }
+
+   /** should it use duplicate detection on failover clients. */
+   public OpenWireProtocolManager 
setOpenwireUseDuplicateDetectionOnFailover(boolean 
openwireUseDuplicateDetectionOnFailover) {
+      this.openwireUseDuplicateDetectionOnFailover = 
openwireUseDuplicateDetectionOnFailover;
+      return this;
+   }
+
    @Override
    public void nodeUP(TopologyMember member, boolean last) {
       if (topologyMap.put(member.getNodeId(), member) == null) {
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 954edbc..e3e0b0e 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -364,6 +364,10 @@ public class AMQSession implements SessionCallback {
       connection.disconnect(forcePossibleFailoverReconnect.getMessage(), true);
    }
 
+   private static boolean isTemporary(ProducerInfo producerInfo) {
+      return producerInfo != null && producerInfo.getDestination() != null && 
producerInfo.getDestination().isTemporary();
+   }
+
    public void send(final ProducerInfo producerInfo,
                     final Message messageSend,
                     final boolean sendProducerAck) throws Exception {
@@ -391,7 +395,8 @@ public class AMQSession implements SessionCallback {
       * not receive acks will be resent.  (ActiveMQ broker handles this by 
returning a last sequence id received to
       * the client).  To handle this in Artemis we use a duplicate ID cache.  
To do this we check to see if the
       * message comes from failover connection.  If so we add a DUPLICATE_ID 
to handle duplicates after a resend. */
-      if (connection.getContext().isFaultTolerant() && 
!messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString()))
 {
+
+      if (connection.getContext().isFaultTolerant() && 
protocolManager.isOpenwireUseDuplicateDetectionOnFailover() && 
!messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString())
 && !isTemporary(producerInfo)) {
          
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID,
 SimpleString.toSimpleString(messageSend.getMessageId().toString()));
       }
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
index 8d2cdfa..830d159 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
@@ -234,6 +234,14 @@ public class OperationContextUnitTest extends 
ActiveMQTestBase {
          assertTrue(latch1.await(10, TimeUnit.SECONDS));
          assertTrue(latch2.await(10, TimeUnit.SECONDS));
 
+         if (impl.storeOnlyTasks != null) {
+            Assert.assertEquals(0, impl.storeOnlyTasks.size());
+         }
+
+         if (impl.tasks != null) {
+            Assert.assertEquals(0, impl.tasks.size());
+         }
+
       } finally {
          executor.shutdown();
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FailoverDuplicateIDUsageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FailoverDuplicateIDUsageTest.java
new file mode 100644
index 0000000..3231256
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/FailoverDuplicateIDUsageTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FailoverDuplicateIDUsageTest extends ActiveMQTestBase {
+
+   ActiveMQServer server;
+
+
+   @Before
+   public void setupServer() throws Exception {
+      server = createServer(true, true);
+   }
+
+
+   @Test
+   public void testTempQueue() throws Exception {
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      
server.getConfiguration().clearAcceptorConfigurations().addAcceptorConfiguration("openwire",
 "tcp://localhost:61616?openwireUseDuplicateDetectionOnFailover=true");
+      server.start();
+      server.waitForActivation(10, TimeUnit.SECONDS);
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createTemporaryQueue();
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("hello"));
+         }
+         Assert.assertEquals(0, 
countDuplicateDetection(server.getConfiguration()));
+      }
+
+
+   }
+
+   @Test
+   public void testNoDuplicate() throws Exception {
+      testDuplicate(false);
+   }
+
+   @Test
+   public void testDuplicate() throws Exception {
+      testDuplicate(true);
+   }
+
+   private void testDuplicate(boolean useDuplicate) throws Exception {
+      String queueName = getName();
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      
server.getConfiguration().clearAcceptorConfigurations().addAcceptorConfiguration("openwire",
 "tcp://localhost:61616?openwireUseDuplicateDetectionOnFailover=" + 
useDuplicate);
+      server.start();
+      server.waitForActivation(10, TimeUnit.SECONDS);
+      server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("hello"));
+         }
+      }
+      server.stop();
+
+      Assert.assertEquals(useDuplicate ? 100 : 0, 
countDuplicateDetection(server.getConfiguration()));
+
+   }
+
+   private int countDuplicateDetection(Configuration configuration) throws 
Exception {
+      HashMap<Integer, AtomicInteger> maps = countJournal(configuration);
+      AtomicInteger value = maps.get((int)JournalRecordIds.DUPLICATE_ID);
+      return value == null ? 0 : value.get();
+   }
+
+
+}

Reply via email to