[ 
https://issues.apache.org/jira/browse/ARTEMIS-4084?focusedWorklogId=824725&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-824725
 ]

ASF GitHub Bot logged work on ARTEMIS-4084:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Nov/22 18:45
            Start Date: 09/Nov/22 18:45
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4283:
URL: https://github.com/apache/activemq-artemis/pull/4283#discussion_r1018273339


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java:
##########
@@ -203,20 +223,31 @@ public void addSorted(E e) {
          // This would be an optimization for our usage.
          // avoiding scanning the entire List just to add at the end, so we 
compare the end first.
          if (comparator.compare(tail.val(), e) >= 0) {
+            logger.trace("addTail as compare is after tail {}", e);

Review Comment:
   Perhaps log the tail value as well as e? We can already know what e is when 
reading the logs from the earlier logging by the thread, but we wont 
necessarily know what tail was when the comparison was made.



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
+   protected ActiveMQServer server;
+   protected ClientSession session;
+   protected ClientSessionFactory sf;
+   protected ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      Configuration config = createDefaultNettyConfig();
+      config.setCriticalAnalyzer(true);
+      config.setCriticalAnalyzerTimeout(10000);
+      config.setCriticalAnalyzerCheckPeriod(5000);
+      config.setConnectionTTLOverride(5000);
+      config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
+      server = createServer(false, config);
+      server.start();
+   }
+
+   @Test
+   public void clientCrashMassiveRollbackTest() throws Exception {
+      final String queueName = "queueName";
+      final int messageCount = 1000000;
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("(tcp://localhost:61616)");
+      factory.setConsumerWindowSize(-1);
+      factory.setConfirmationWindowSize(10240000);
+      Connection connection = factory.createConnection();
+      connection.start();
+
+      Thread thread = new Thread(() -> {
+         try {
+            Session consumerSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue destination = consumerSession.createQueue(queueName);
+            MessageConsumer consumer = 
consumerSession.createConsumer(destination);
+            for (;;) {
+               consumer.receive();
+            }
+         } catch (Exception e) {
+         }
+      });
+
+      locator = createNettyNonHALocator();
+      locator.setConfirmationWindowSize(10240000);
+      sf = createSessionFactory(locator);
+      session = addClientSession(sf.createSession(false, true, true));
+      SendAcknowledgementHandler sendHandler = message -> {
+      };
+      session.setSendAcknowledgementHandler(sendHandler);
+      session.createQueue(new 
QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+      ClientProducer producer = session.createProducer(queueName);
+      QueueControl queueControl = 
(QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + 
queueName);
+
+      thread.start();
+
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(session.createMessage(true));
+      }
+      producer.close();
+
+      while (queueControl.getDeliveringCount() < messageCount) {
+         Thread.sleep(1000);
+      }
+
+      //Ran the consumer in a thread to be able to kill it "uncleanly"
+      //Thread.stop() is deprecated for the reason I am exploiting here
+      thread.stop();

Review Comment:
   It has been deprecated for 24 years. It is now deprecated _for-removal_ so 
they will likely remove it at some point soon. EDIT: Actually, they have 
_already_ deliberately broken it for Java 20, so I expect it might not be there 
in even 21 (the next LTS): https://bugs.openjdk.org/browse/JDK-8289610. We 
shouldn't be using this.



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java:
##########
@@ -203,20 +223,31 @@ public void addSorted(E e) {
          // This would be an optimization for our usage.
          // avoiding scanning the entire List just to add at the end, so we 
compare the end first.
          if (comparator.compare(tail.val(), e) >= 0) {
+            logger.trace("addTail as compare is after tail {}", e);
             addTail(e);
             return;
          }
 
-         Node<E> fetching = head.next;
-         while (fetching.next != null) {
-            int compareNext = comparator.compare(fetching.next.val(), e);
-            if (compareNext <= 0) {
-               addAfter(fetching, e);
-               return;
+         if (lastAdd != null) { // as an optimization we check against the 
last add rather than always scan.

Review Comment:
   Should probably copy the value rather than accessing it repeatedly given it 
is volatile, and also makes it less brittle in terms of its not-thread-safe'ty 
(though obviously the whole list isnt).



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -1165,6 +1165,12 @@ public void addHead(final List<MessageReference> refs, 
boolean scheduling) {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addSorted(final List<MessageReference> refs, boolean 
scheduling) {
+
+      if (refs.size() > MAX_DELIVERIES_IN_LOOP) {
+         logger.debug("Switching addSortedCall to addSortedHugeLoad {}", name);

Review Comment:
   space before 'call', the addSortedHugeLoad name doesnt match method called



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java:
##########
@@ -203,20 +223,31 @@ public void addSorted(E e) {
          // This would be an optimization for our usage.
          // avoiding scanning the entire List just to add at the end, so we 
compare the end first.
          if (comparator.compare(tail.val(), e) >= 0) {
+            logger.trace("addTail as compare is after tail {}", e);
             addTail(e);
             return;
          }
 
-         Node<E> fetching = head.next;
-         while (fetching.next != null) {
-            int compareNext = comparator.compare(fetching.next.val(), e);
-            if (compareNext <= 0) {
-               addAfter(fetching, e);
-               return;
+         if (lastAdd != null) { // as an optimization we check against the 
last add rather than always scan.
+            if (lastAdd.prev != null && lastAdd.prev.val() != null) {
+               if (comparator.compare(lastAdd.prev.val(), e) > 0 && 
comparator.compare(lastAdd.val(), e) < 0) {
+                  logger.trace("addABefore last element {}", e);
+                  addAfter(lastAdd.prev, e);
+                  return;
+               }
+            }
+            if (lastAdd.next != null && lastAdd.next.val() != null) {
+               if (comparator.compare(lastAdd.val(), e) > 0 && 
comparator.compare(lastAdd.next.val(), e) < 0) {
+                  logger.trace("addAfter last element {}", e);
+                  addAfter(lastAdd, e);
+                  return;
+               }
             }
-            fetching = fetching.next;
          }
 
+         if (addSortedScan(e))
+            return;

Review Comment:
   Can we spend that 1 line on braces? :) More consistent and  less likely to 
be messed up later.



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java:
##########
@@ -203,20 +223,31 @@ public void addSorted(E e) {
          // This would be an optimization for our usage.
          // avoiding scanning the entire List just to add at the end, so we 
compare the end first.
          if (comparator.compare(tail.val(), e) >= 0) {
+            logger.trace("addTail as compare is after tail {}", e);
             addTail(e);
             return;
          }
 
-         Node<E> fetching = head.next;
-         while (fetching.next != null) {
-            int compareNext = comparator.compare(fetching.next.val(), e);
-            if (compareNext <= 0) {
-               addAfter(fetching, e);
-               return;
+         if (lastAdd != null) { // as an optimization we check against the 
last add rather than always scan.
+            if (lastAdd.prev != null && lastAdd.prev.val() != null) {
+               if (comparator.compare(lastAdd.prev.val(), e) > 0 && 
comparator.compare(lastAdd.val(), e) < 0) {
+                  logger.trace("addABefore last element {}", e);
+                  addAfter(lastAdd.prev, e);

Review Comment:
   addABefore has a typo, but more importantly the code then executes addAfter 
which makes it look wrong since the other log messages in this area refer to 
literal method names, though it isnt wrong since it passes the previous node. 
Also "last element" sounds a bit like it will be the tail, though clearly isnt 
here.
   
   Something like "adding before most recently added element" might be clearer?



##########
tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
+   protected ActiveMQServer server;
+   protected ClientSession session;
+   protected ClientSessionFactory sf;
+   protected ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      Configuration config = createDefaultNettyConfig();
+      config.setCriticalAnalyzer(true);
+      config.setCriticalAnalyzerTimeout(10000);
+      config.setCriticalAnalyzerCheckPeriod(5000);
+      config.setConnectionTTLOverride(5000);
+      config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
+      server = createServer(false, config);
+      server.start();
+   }
+
+   @Test
+   public void clientCrashMassiveRollbackTest() throws Exception {
+      final String queueName = "queueName";
+      final int messageCount = 1000000;
+
+      ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("(tcp://localhost:61616)");
+      factory.setConsumerWindowSize(-1);
+      factory.setConfirmationWindowSize(10240000);
+      Connection connection = factory.createConnection();
+      connection.start();
+
+      Thread thread = new Thread(() -> {
+         try {
+            Session consumerSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue destination = consumerSession.createQueue(queueName);
+            MessageConsumer consumer = 
consumerSession.createConsumer(destination);
+            for (;;) {
+               consumer.receive();
+            }
+         } catch (Exception e) {
+         }
+      });
+
+      locator = createNettyNonHALocator();
+      locator.setConfirmationWindowSize(10240000);
+      sf = createSessionFactory(locator);
+      session = addClientSession(sf.createSession(false, true, true));
+      SendAcknowledgementHandler sendHandler = message -> {
+      };
+      session.setSendAcknowledgementHandler(sendHandler);
+      session.createQueue(new 
QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+      ClientProducer producer = session.createProducer(queueName);
+      QueueControl queueControl = 
(QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + 
queueName);
+
+      thread.start();
+
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(session.createMessage(true));
+      }
+      producer.close();
+
+      while (queueControl.getDeliveringCount() < messageCount) {
+         Thread.sleep(1000);
+      }
+
+      //Ran the consumer in a thread to be able to kill it "uncleanly"
+      //Thread.stop() is deprecated for the reason I am exploiting here
+      thread.stop();
+
+
+
+      Assert.assertEquals(messageCount, queueControl.getMessageCount());
+      Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTED, 
server.getState());
+
+      Thread.sleep(5000);

Review Comment:
   Could it possibly assert something useful after wasting the 5 seconds?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 824725)
    Time Spent: 4h  (was: 3h 50m)

> Rollbacking massive amounts of messages might crash broker
> ----------------------------------------------------------
>
>                 Key: ARTEMIS-4084
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4084
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Anton Roskvist
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> Critical Analyzer triggers, but even if it is set to LOG or disabled the 
> broker is put in such a bad state it becomes unresponsive until restarted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to