This is an automated email from the ASF dual-hosted git repository.

wy96f pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d1f062  ARTEMIS-2544 Remove rolledback PageTransactionInfo to free up 
memory
     new 20ca545  This closes #2885
1d1f062 is described below

commit 1d1f0625db80538cc73d11f06c8989602d3e820a
Author: Wei Yang <[email protected]>
AuthorDate: Thu Nov 7 14:53:50 2019 +0800

    ARTEMIS-2544 Remove rolledback PageTransactionInfo to free up memory
---
 .../core/paging/impl/PageTransactionInfoImpl.java  |  5 +-
 .../tests/integration/paging/PagingTest.java       | 61 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 1b92b90..4684b8b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -113,7 +113,8 @@ public final class PageTransactionInfoImpl implements 
PageTransactionInfo {
             } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
             }
-
+         }
+         if (pagingManager != null) {
             pagingManager.removeTransaction(this.transactionID);
          }
          return false;
@@ -242,6 +243,7 @@ public final class PageTransactionInfoImpl implements 
PageTransactionInfo {
       if (lateDeliveries != null) {
          for (LateDelivery pos : lateDeliveries) {
             pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
+            onUpdate(1, null, 
pos.getSubscription().getPagingStore().getPagingManager());
          }
          lateDeliveries = null;
       }
@@ -283,6 +285,7 @@ public final class PageTransactionInfoImpl implements 
PageTransactionInfo {
             logger.trace("rolled back, position ignored on " + cursor + ", 
position=" + cursorPos);
          }
          cursor.positionIgnored(cursorPos);
+         onUpdate(1, null, cursor.getPagingStore().getPagingManager());
          return true;
       } else {
          if (logger.isTraceEnabled()) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index c020fbe..cda5311 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -70,6 +70,7 @@ import 
org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
@@ -6838,6 +6839,66 @@ public class PagingTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testRollbackPageTransactionBeforeDelivery() throws Exception {
+      testRollbackPageTransaction(true);
+   }
+
+   @Test
+   public void testRollbackPageTransactionAfterDelivery() throws Exception {
+      testRollbackPageTransaction(false);
+   }
+
+   private void testRollbackPageTransaction(boolean rollbackBeforeDelivery) 
throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig();
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, 
PagingTest.PAGE_MAX);
+
+      server.start();
+
+      final int numberOfMessages = 2;
+
+      
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(null, null, false, false, true, 
false, 0);
+
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+      Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+      queue.getPageSubscription().getPagingStore().startPaging();
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      if (rollbackBeforeDelivery) {
+         sendMessages(session, producer, numberOfMessages);
+         session.rollback();
+         assertEquals(server.getPagingManager().getTransactions().size(), 1);
+         PageTransactionInfo pageTransactionInfo = 
server.getPagingManager().getTransactions().values().iterator().next();
+         // Make sure rollback happens before delivering messages
+         Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+         session.start();
+         Assert.assertNull(consumer.receiveImmediate());
+         assertTrue(server.getPagingManager().getTransactions().isEmpty());
+      } else {
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+         session.start();
+         sendMessages(session, producer, numberOfMessages);
+         Assert.assertNull(consumer.receiveImmediate());
+         assertEquals(server.getPagingManager().getTransactions().size(), 1);
+         PageTransactionInfo pageTransactionInfo = 
server.getPagingManager().getTransactions().values().iterator().next();
+         session.rollback();
+         Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
+         assertTrue(server.getPagingManager().getTransactions().isEmpty());
+      }
+
+      session.close();
+   }
+
    @Override
    protected Configuration createDefaultConfig(final int serverID, final 
boolean netty) throws Exception {
       Configuration configuration = super.createDefaultConfig(serverID, netty);

Reply via email to