This is an automated email from the ASF dual-hosted git repository.
wy96f pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 1d1f062 ARTEMIS-2544 Remove rolledback PageTransactionInfo to free up
memory
new 20ca545 This closes #2885
1d1f062 is described below
commit 1d1f0625db80538cc73d11f06c8989602d3e820a
Author: Wei Yang <[email protected]>
AuthorDate: Thu Nov 7 14:53:50 2019 +0800
ARTEMIS-2544 Remove rolledback PageTransactionInfo to free up memory
---
.../core/paging/impl/PageTransactionInfoImpl.java | 5 +-
.../tests/integration/paging/PagingTest.java | 61 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 1 deletion(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 1b92b90..4684b8b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -113,7 +113,8 @@ public final class PageTransactionInfoImpl implements
PageTransactionInfo {
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.pageTxDeleteError(e, recordID);
}
-
+ }
+ if (pagingManager != null) {
pagingManager.removeTransaction(this.transactionID);
}
return false;
@@ -242,6 +243,7 @@ public final class PageTransactionInfoImpl implements
PageTransactionInfo {
if (lateDeliveries != null) {
for (LateDelivery pos : lateDeliveries) {
pos.getSubscription().lateDeliveryRollback(pos.getPagePosition());
+ onUpdate(1, null,
pos.getSubscription().getPagingStore().getPagingManager());
}
lateDeliveries = null;
}
@@ -283,6 +285,7 @@ public final class PageTransactionInfoImpl implements
PageTransactionInfo {
logger.trace("rolled back, position ignored on " + cursor + ",
position=" + cursorPos);
}
cursor.positionIgnored(cursorPos);
+ onUpdate(1, null, cursor.getPagingStore().getPagingManager());
return true;
} else {
if (logger.isTraceEnabled()) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index c020fbe..cda5311 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -70,6 +70,7 @@ import
org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
@@ -6838,6 +6839,66 @@ public class PagingTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testRollbackPageTransactionBeforeDelivery() throws Exception {
+ testRollbackPageTransaction(true);
+ }
+
+ @Test
+ public void testRollbackPageTransactionAfterDelivery() throws Exception {
+ testRollbackPageTransaction(false);
+ }
+
+ private void testRollbackPageTransaction(boolean rollbackBeforeDelivery)
throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig();
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX);
+
+ server.start();
+
+ final int numberOfMessages = 2;
+
+
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(null, null, false, false, true,
false, 0);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ queue.getPageSubscription().getPagingStore().startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ if (rollbackBeforeDelivery) {
+ sendMessages(session, producer, numberOfMessages);
+ session.rollback();
+ assertEquals(server.getPagingManager().getTransactions().size(), 1);
+ PageTransactionInfo pageTransactionInfo =
server.getPagingManager().getTransactions().values().iterator().next();
+ // Make sure rollback happens before delivering messages
+ Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ session.start();
+ Assert.assertNull(consumer.receiveImmediate());
+ assertTrue(server.getPagingManager().getTransactions().isEmpty());
+ } else {
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ session.start();
+ sendMessages(session, producer, numberOfMessages);
+ Assert.assertNull(consumer.receiveImmediate());
+ assertEquals(server.getPagingManager().getTransactions().size(), 1);
+ PageTransactionInfo pageTransactionInfo =
server.getPagingManager().getTransactions().values().iterator().next();
+ session.rollback();
+ Wait.assertTrue(() -> pageTransactionInfo.isRollback(), 1000, 100);
+ assertTrue(server.getPagingManager().getTransactions().isEmpty());
+ }
+
+ session.close();
+ }
+
@Override
protected Configuration createDefaultConfig(final int serverID, final
boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);