ARTEMIS-524 Paging could lose data eventually after crashes https://issues.apache.org/jira/browse/ARTEMIS-524
I am keeping all the debug ad tracing I added during the debug of this issue, for that reason this commit may look longer than expected The fix will be highlited by the tests added on org.apache.activemq.artemis.tests.integration.client.PagingTest Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3e2adf12 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3e2adf12 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3e2adf12 Branch: refs/heads/master Commit: 3e2adf123b96c3dfade3d1584ea0ddf65a876941 Parents: ec52693 Author: Clebert Suconic <[email protected]> Authored: Tue May 17 15:06:02 2016 -0400 Committer: Clebert Suconic <[email protected]> Committed: Tue May 17 20:28:40 2016 -0400 ---------------------------------------------------------------------- .../core/client/impl/ClientConsumerImpl.java | 90 ++++++- .../core/impl/ActiveMQConsumerContext.java | 7 + .../core/protocol/core/impl/ChannelImpl.java | 3 + .../artemis/utils/SoftValueHashMap.java | 1 + .../jms/client/ActiveMQMessageConsumer.java | 2 + .../artemis/jms/client/JMSExceptionHelper.java | 10 + .../jms/client/JMSMessageListenerWrapper.java | 3 + .../artemis/core/paging/PagingStoreFactory.java | 5 + .../core/paging/cursor/NonExistentPage.java | 43 ++++ .../core/paging/cursor/PageCursorProvider.java | 3 + .../core/paging/cursor/PageSubscription.java | 2 +- .../paging/cursor/impl/LivePageCacheImpl.java | 6 +- .../cursor/impl/PageCursorProviderImpl.java | 91 ++++--- .../cursor/impl/PageSubscriptionImpl.java | 53 +++- .../activemq/artemis/core/paging/impl/Page.java | 4 +- .../paging/impl/PageTransactionInfoImpl.java | 20 ++ .../core/paging/impl/PagingStoreFactoryNIO.java | 7 + .../core/paging/impl/PagingStoreImpl.java | 11 +- .../impl/journal/JournalStorageManager.java | 6 +- .../impl/journal/LargeServerMessageImpl.java | 15 +- .../postoffice/impl/DuplicateIDCacheImpl.java | 2 +- .../server/impl/RemotingServiceImpl.java | 2 + .../core/server/impl/ActiveMQServerImpl.java | 8 +- .../artemis/core/server/impl/RefsOperation.java | 5 + .../core/server/impl/ServerConsumerImpl.java | 8 + .../tests/integration/client/PagingTest.java | 254 +++++++++++++++++++ .../core/paging/impl/PagingStoreImplTest.java | 14 +- 27 files changed, 605 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 7b72188..57bb869 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -171,6 +171,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { this.contextClassLoader = contextClassLoader; this.flowControlExecutor = flowControlExecutor; + + if (logger.isTraceEnabled()) { + logger.trace(this + ":: being created at", new Exception("trace")); + } } // ClientConsumer implementation @@ -182,9 +186,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException { + if (logger.isTraceEnabled()) { + logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ")"); + } + checkClosed(); if (largeMessageReceived != null) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> discard LargeMessage body for " + largeMessageReceived); + } // Check if there are pending packets to be received largeMessageReceived.discardBody(); largeMessageReceived = null; @@ -195,10 +206,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } if (handler != null) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> throwing messageHandlerSet"); + } throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet(); } if (clientWindowSize == 0) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> start slowConsumer"); + } startSlowConsumer(); } @@ -235,6 +252,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } } + if ( m != null) { + session.workDone(); + } + try { wait(toWait); } @@ -256,6 +277,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { if (failedOver) { if (m == null) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> m == null and failover"); + } + // if failed over and the buffer is null, we reset the state and try it again failedOver = false; deliveryForced = false; @@ -263,13 +288,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { continue; } else { + if (logger.isTraceEnabled()) { + logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> failedOver, but m != null, being " + m); + } failedOver = false; } } if (callForceDelivery) { if (logger.isTraceEnabled()) { - logger.trace("Forcing delivery"); + logger.trace(this + "::Forcing delivery"); } // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks sessionContext.forceDelivery(this, forceDeliveryCount++); @@ -291,14 +319,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { resetIfSlowConsumer(); if (logger.isTraceEnabled()) { - logger.trace("There was nothing on the queue, leaving it now:: returning null"); + logger.trace(this + "::There was nothing on the queue, leaving it now:: returning null"); } return null; } else { if (logger.isTraceEnabled()) { - logger.trace("Ignored force delivery answer as it belonged to another call"); + logger.trace(this + "::Ignored force delivery answer as it belonged to another call"); } // Ignore the message continue; @@ -331,14 +359,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } if (logger.isTraceEnabled()) { - logger.trace("Returning " + m); + logger.trace(this + "::Returning " + m); } return m; } else { if (logger.isTraceEnabled()) { - logger.trace("Returning null"); + logger.trace(this + "::Returning null"); } resetIfSlowConsumer(); return null; @@ -352,12 +380,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { @Override public ClientMessage receive(final long timeout) throws ActiveMQException { + + if (logger.isTraceEnabled()) { + logger.trace(this + ":: receive(" + timeout + ")"); + } ClientMessage msg = receive(timeout, false); if (msg == null && !closed) { + if (logger.isTraceEnabled()) { + logger.trace(this + ":: receive(" + timeout + ") -> null, trying again with receive(0)"); + } msg = receive(0, true); } + if (logger.isTraceEnabled()) { + logger.trace(this + ":: returning " + msg); + } + return msg; } @@ -471,6 +510,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { @Override public void clearAtFailover() { + if (logger.isTraceEnabled()) { + logger.trace(this + "::ClearAtFailover"); + } clearBuffer(); // failover will issue a start later @@ -647,7 +689,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } if (currentLargeMessageController == null) { if (logger.isTraceEnabled()) { - logger.trace("Sending back credits for largeController = null " + flowControlSize); + logger.trace(this + "::Sending back credits for largeController = null " + flowControlSize); } flowControl(flowControlSize, false); } @@ -722,12 +764,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { individualAcknowledge(message); } else { + ackBytes += message.getEncodeSize(); + if (logger.isTraceEnabled()) { + logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize()); + } + if (ackBytes >= ackBatchSize) { + if (logger.isTraceEnabled()) { + logger.trace(this + ":: acknowledge acking " + cmi); + } doAck(cmi); } else { + if (logger.isTraceEnabled()) { + logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi); + } lastAckedMessage = cmi; } } @@ -745,6 +798,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { @Override public void flushAcks() throws ActiveMQException { if (lastAckedMessage != null) { + if (logger.isTraceEnabled()) { + logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage); + } doAck(lastAckedMessage); } } @@ -763,7 +819,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { if (creditsToSend >= clientWindowSize) { if (clientWindowSize == 0 && discountSlowConsumer) { if (logger.isTraceEnabled()) { - logger.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer"); + logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer"); } // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be @@ -810,7 +866,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { */ private void startSlowConsumer() { if (logger.isTraceEnabled()) { - logger.trace("Sending 1 credit to start delivering of one message to slow consumer"); + logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer"); } sendCredits(1); try { @@ -855,7 +911,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { private void queueExecutor() { if (logger.isTraceEnabled()) { - logger.trace("Adding Runner on Executor for delivery"); + logger.trace(this + "::Adding Runner on Executor for delivery"); } sessionExecutor.execute(runner); @@ -946,7 +1002,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { if (!expired) { if (logger.isTraceEnabled()) { - logger.trace("Calling handler.onMessage"); + logger.trace(this + "::Calling handler.onMessage"); } final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { @Override @@ -981,7 +1037,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } if (logger.isTraceEnabled()) { - logger.trace("Handler.onMessage done"); + logger.trace(this + "::Handler.onMessage done"); } if (message.isLargeMessage()) { @@ -1065,9 +1121,21 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { lastAckedMessage = null; + if (logger.isTraceEnabled()) { + logger.trace(this + "::Acking message " + message); + } + session.acknowledge(this, message); } + @Override + public String toString() { + return super.toString() + "{" + + "consumerContext=" + consumerContext + + ", queueName=" + queueName + + '}'; + } + // Inner classes // -------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java index 08abb91..65540ee 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java @@ -46,6 +46,13 @@ public class ActiveMQConsumerContext extends ConsumerContext { } @Override + public String toString() { + return "ActiveMQConsumerContext{" + + "id=" + id + + '}'; + } + + @Override public int hashCode() { return (int) (id ^ (id >>> 32)); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 957a3a9..b4ac75d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -363,6 +363,9 @@ public final class ChannelImpl implements Channel { checkReconnectID(reconnectID); + if (logger.isTraceEnabled()) { + logger.trace("Sending blocking " + packet); + } connection.getTransportConnection().write(buffer, false, false); long toWait = connection.getBlockingCallTimeout(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java index 6428c8a..b499910 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java @@ -316,6 +316,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen private void processQueue() { AggregatedSoftReference ref = null; while ((ref = (AggregatedSoftReference) this.refQueue.poll()) != null) { + logger.tracef("Removing reference through processQueue:: %s", ref.get()); mapDelegate.remove(ref.key); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index 04e4f41..8929fa5 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -27,12 +27,14 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; /** * ActiveMQ Artemis implementation of a JMS MessageConsumer. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java index 666fa9d..1a8456f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java @@ -22,9 +22,19 @@ import javax.jms.JMSException; import javax.jms.JMSSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; public final class JMSExceptionHelper { + public static JMSException convertFromActiveMQException(final ActiveMQInterruptedException me) { + JMSException je = new javax.jms.IllegalStateException(me.getMessage()); + + je.setStackTrace(me.getStackTrace()); + + je.initCause(me); + return je; + } + public static JMSException convertFromActiveMQException(final ActiveMQException me) { JMSException je; switch (me.getType()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java index ab62dbc..af5b158 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; public class JMSMessageListenerWrapper implements MessageHandler { @@ -83,6 +84,7 @@ public class JMSMessageListenerWrapper implements MessageHandler { message.acknowledge(); } catch (ActiveMQException e) { + ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly(); ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e); } } @@ -122,6 +124,7 @@ public class JMSMessageListenerWrapper implements MessageHandler { } } catch (ActiveMQException e) { + ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly(); ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java index 8c2d11a..7c52c63 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java @@ -17,9 +17,12 @@ package org.apache.activemq.artemis.core.paging; import java.util.List; +import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -30,6 +33,8 @@ public interface PagingStoreFactory { PagingStore newStore(SimpleString address, AddressSettings addressSettings); + PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor); + void stop() throws InterruptedException; void setPagingManager(PagingManager manager); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java new file mode 100644 index 0000000..73a22ce --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.paging.cursor; + +/** This is an internal exception. + * In certain cases AfterCommit could try to decrease the reference counting on large messages. + * But if the whole page is cleaned an exception could happen, which is ok on that path, and we need to identify it. */ +public class NonExistentPage extends RuntimeException { + + public NonExistentPage() { + } + + public NonExistentPage(String message) { + super(message); + } + + public NonExistentPage(String message, Throwable cause) { + super(message, cause); + } + + public NonExistentPage(Throwable cause) { + super(cause); + } + + public NonExistentPage(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index 951b83c..b2a6aff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -24,6 +24,9 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; */ public interface PageCursorProvider { + /** Used on tests, to simulate a scenario where the VM cleared space */ + void clearCache(); + PageCache getPageCache(long pageNr); PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index df2ccc3..89c6d44 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -93,7 +93,7 @@ public interface PageSubscription { */ void reloadACK(PagePosition position); - void reloadPageCompletion(PagePosition position); + void reloadPageCompletion(PagePosition position) throws Exception; void reloadPageInfo(long pageNr); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java index 29d990a..b964b56 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java @@ -23,13 +23,16 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.jboss.logging.Logger; /** * This is the same as PageCache, however this is for the page that's being currently written. */ public class LivePageCacheImpl implements LivePageCache { - private final List<PagedMessage> messages = new LinkedList<>(); + private static final Logger logger = Logger.getLogger(LivePageCacheImpl.class); + + private final List<PagedMessage> messages = new LinkedList<PagedMessage>(); private final Page page; @@ -82,6 +85,7 @@ public class LivePageCacheImpl implements LivePageCache { @Override public synchronized void close() { + logger.tracef("Closing %s", this); this.isLive = false; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 9862a1f..4f3a6a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.paging.cursor.PageCache; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.PagePosition; @@ -58,20 +59,20 @@ public class PageCursorProviderImpl implements PageCursorProvider { /** * As an optimization, avoid subsequent schedules as they are unnecessary */ - private final AtomicInteger scheduledCleanup = new AtomicInteger(0); + protected final AtomicInteger scheduledCleanup = new AtomicInteger(0); - private volatile boolean cleanupEnabled = true; + protected volatile boolean cleanupEnabled = true; - private final PagingStore pagingStore; + protected final PagingStore pagingStore; - private final StorageManager storageManager; + protected final StorageManager storageManager; // This is the same executor used at the PageStoreImpl. One Executor per pageStore private final Executor executor; private final SoftValueHashMap<Long, PageCache> softCache; - private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<>(); + private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>(); // Static -------------------------------------------------------- @@ -115,7 +116,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) { // sanity check, this should never happen unless there's a bug - throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache); + throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache); } return cache.getMessage(pos.getMessageNr()); @@ -146,9 +147,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { cache = createPageCache(pageId); // anyone reading from this cache will have to wait reading to finish first // we also want only one thread reading this cache - if (logger.isTraceEnabled()) { - logger.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress()); - } + logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress()); readPage((int) pageId, cache); softCache.put(pageId, cache); } @@ -186,6 +185,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { @Override public void addPageCache(PageCache cache) { + logger.tracef("Add page cache %s", cache); synchronized (softCache) { softCache.put(cache.getPageId(), cache); } @@ -203,6 +203,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override public void clearCache() { synchronized (softCache) { softCache.clear(); @@ -273,6 +274,9 @@ public class PageCursorProviderImpl implements PageCursorProvider { @Override public void scheduleCleanup() { + if (logger.isTraceEnabled()) { + logger.trace("scheduling cleanup", new Exception("trace")); + } if (!cleanupEnabled || scheduledCleanup.intValue() > 2) { // Scheduled cleanup was already scheduled before.. never mind! // or we have cleanup disabled @@ -286,7 +290,9 @@ public class PageCursorProviderImpl implements PageCursorProvider { public void run() { storageManager.setContext(storageManager.newSingleThreadContext()); try { - cleanup(); + if (cleanupEnabled) { + cleanup(); + } } finally { storageManager.clearContext(); @@ -336,7 +342,10 @@ public class PageCursorProviderImpl implements PageCursorProvider { @Override public void cleanup() { - ArrayList<Page> depagedPages = new ArrayList<>(); + + logger.tracef("performing page cleanup %s", this); + + ArrayList<Page> depagedPages = new ArrayList<Page>(); while (true) { if (pagingStore.lock(100)) { @@ -346,6 +355,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { return; } + logger.tracef("%s locked", this); + synchronized (this) { try { if (!pagingStore.isStarted()) { @@ -356,14 +367,12 @@ public class PageCursorProviderImpl implements PageCursorProvider { return; } - if (logger.isDebugEnabled()) { - logger.debug("Asserting cleanup for address " + this.pagingStore.getAddress()); - } - ArrayList<PageSubscription> cursorList = cloneSubscriptions(); long minPage = checkMinPage(cursorList); + logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage); + // if the current page is being written... // on that case we need to move to verify it in a different way if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) { @@ -376,18 +385,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { // All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark if (complete) { - if (logger.isDebugEnabled()) { - logger.debug("Address " + pagingStore.getAddress() + - " is leaving page mode as all messages are consumed and acknowledged from the page store"); - } - - pagingStore.forceAnotherPage(); - - Page currentPage = pagingStore.getCurrentPage(); - - storeBookmark(cursorList, currentPage); - - pagingStore.stopPaging(); + cleanupComplete(cursorList); } } @@ -423,7 +421,30 @@ public class PageCursorProviderImpl implements PageCursorProvider { pagingStore.unlock(); } } + finishCleanup(depagedPages); + + + } + + // Protected as a way to inject testing + protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Address " + pagingStore.getAddress() + + " is leaving page mode as all messages are consumed and acknowledged from the page store"); + } + + pagingStore.forceAnotherPage(); + + Page currentPage = pagingStore.getCurrentPage(); + storeBookmark(cursorList, currentPage); + + pagingStore.stopPaging(); + } + + // Protected as a way to inject testing + protected void finishCleanup(ArrayList<Page> depagedPages) { + logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages); try { for (Page depagedPage : depagedPages) { PageCache cache; @@ -433,7 +454,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } if (logger.isTraceEnabled()) { - logger.trace("Removing page " + depagedPage.getPageId() + " from page-cache"); + logger.trace("Removing pageNr=" + depagedPage.getPageId() + " from page-cache"); } if (cache == null) { @@ -479,12 +500,15 @@ public class PageCursorProviderImpl implements PageCursorProvider { } private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) { + + logger.tracef("checkPageCompletion(%d)", minPage); + boolean complete = true; for (PageSubscription cursor : cursorList) { if (!cursor.isComplete(minPage)) { if (logger.isDebugEnabled()) { - logger.debug("Cursor " + cursor + " was considered incomplete at page " + minPage); + logger.debug("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage); } complete = false; @@ -492,7 +516,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } else { if (logger.isDebugEnabled()) { - logger.debug("Cursor " + cursor + "was considered **complete** at page " + minPage); + logger.debug("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage); } } } @@ -545,6 +569,13 @@ public class PageCursorProviderImpl implements PageCursorProvider { } } + @Override + public String toString() { + return "PageCursorProviderImpl{" + + "pagingStore=" + pagingStore + + '}'; + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 57b4efe..440f845 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -191,7 +191,11 @@ final class PageSubscriptionImpl implements PageSubscription { * cursor/subscription. */ @Override - public void reloadPageCompletion(PagePosition position) { + public void reloadPageCompletion(PagePosition position) throws Exception { + // if the current page is complete, we must move it out of the way + if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) { + pageStore.forceAnotherPage(); + } PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null); info.setCompleteInfo(position); synchronized (consumedPages) { @@ -202,6 +206,9 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void scheduleCleanupCheck() { if (autoCleanup) { + if (logger.isTraceEnabled()) { + logger.trace("Scheduling cleanup", new Exception("trace")); + } if (scheduledCleanupCount.get() > 2) { return; } @@ -212,7 +219,9 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void run() { try { - cleanupEntries(false); + if (autoCleanup) { + cleanupEntries(false); + } } catch (Exception e) { ActiveMQServerLogger.LOGGER.problemCleaningCursorPages(e); @@ -242,6 +251,9 @@ final class PageSubscriptionImpl implements PageSubscription { if (completeDelete) { counter.delete(); } + if (logger.isTraceEnabled()) { + logger.trace("cleanupEntries", new Exception("trace")); + } Transaction tx = new TransactionImpl(store); boolean persist = false; @@ -564,17 +576,23 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public boolean isComplete(long page) { + logger.tracef("%s isComplete %d", this, page); synchronized (consumedPages) { if (empty && consumedPages.isEmpty()) { + if (logger.isTraceEnabled()) { + logger.tracef("isComplete(%d)::Subscription %s has empty=%s, consumedPages.isEmpty=%s", (Object)page, this, consumedPages.isEmpty()); + } return true; } PageCursorInfo info = consumedPages.get(page); if (info == null && empty) { + logger.tracef("isComplete(%d)::::Couldn't find info and it is empty", page); return true; } else { + logger.tracef("isComplete(%d)::calling is %s", (Object)page, this, consumedPages.isEmpty()); return info != null && info.isDone(); } } @@ -731,18 +749,18 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void reloadPageInfo(long pageNr) { - getPageInfo(pageNr, true); + getPageInfo(pageNr); } private PageCursorInfo getPageInfo(final PagePosition pos) { - return getPageInfo(pos.getPageNr(), true); + return getPageInfo(pos.getPageNr()); } - private PageCursorInfo getPageInfo(final long pageNr, boolean create) { + private PageCursorInfo getPageInfo(final long pageNr) { synchronized (consumedPages) { PageCursorInfo pageInfo = consumedPages.get(pageNr); - if (create && pageInfo == null) { + if (pageInfo == null) { PageCache cache = cursorProvider.getPageCache(pageNr); if (cache == null) { return null; @@ -814,7 +832,11 @@ final class PageSubscriptionImpl implements PageSubscription { tx.setContainsPersistent(); } - getPageInfo(position).remove(position); + PageCursorInfo info = getPageInfo(position); + + logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info); + + info.remove(position); PageCursorTX cursorTX = (PageCursorTX) tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS); @@ -897,16 +919,17 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public String toString() { try { - return "PageCursorInfo::PageID=" + pageId + + return "PageCursorInfo::pageNr=" + pageId + " numberOfMessage = " + numberOfMessages + ", confirmed = " + confirmed + ", isDone=" + - this.isDone(); + this.isDone() + + " wasLive = " + wasLive; } catch (Exception e) { - return "PageCursorInfo::PageID=" + pageId + + return "PageCursorInfo::pageNr=" + pageId + " numberOfMessage = " + numberOfMessages + ", confirmed = " + @@ -917,6 +940,7 @@ final class PageSubscriptionImpl implements PageSubscription { } public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) { + logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s", pageId, numberOfMessages, cache); this.pageId = pageId; this.numberOfMessages = numberOfMessages; if (cache != null) { @@ -932,6 +956,7 @@ final class PageSubscriptionImpl implements PageSubscription { * @param completePage */ public void setCompleteInfo(final PagePosition completePage) { + logger.tracef("Setting up complete page %s on cursor %s on subscription %s", completePage, this, PageSubscriptionImpl.this); this.completePage = completePage; } @@ -940,6 +965,10 @@ final class PageSubscriptionImpl implements PageSubscription { } public boolean isDone() { + if (logger.isTraceEnabled()) { + logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get()); + + } return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); } @@ -983,7 +1012,7 @@ final class PageSubscriptionImpl implements PageSubscription { " confirmed = " + (confirmed.get() + 1) + " pendingTX = " + pendingTX + - ", page = " + + ", pageNr = " + pageId + " posACK = " + posACK); } catch (Throwable ignored) { @@ -1189,7 +1218,7 @@ final class PageSubscriptionImpl implements PageSubscription { ignored = true; } - PageCursorInfo info = getPageInfo(message.getPosition().getPageNr(), false); + PageCursorInfo info = getPageInfo(message.getPosition().getPageNr()); if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) { continue; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 83a6c53..0888416 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -251,7 +251,7 @@ public final class Page implements Comparable<Page> { } if (logger.isDebugEnabled()) { - logger.debug("Deleting pageId=" + pageId + " on store " + storeName); + logger.debug("Deleting pageNr=" + pageId + " on store " + storeName); } if (messages != null) { @@ -294,7 +294,7 @@ public final class Page implements Comparable<Page> { @Override public String toString() { - return "Page::pageID=" + this.pageId + ", file=" + this.file; + return "Page::pageNr=" + this.pageId + ", file=" + this.file; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java index 55569b2..1502855 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java @@ -34,12 +34,15 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.utils.DataConstants; +import org.jboss.logging.Logger; public final class PageTransactionInfoImpl implements PageTransactionInfo { // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- + private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class); + private long transactionID; private volatile long recordID = -1; @@ -239,19 +242,36 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { public synchronized boolean deliverAfterCommit(PageIterator iterator, PageSubscription cursor, PagePosition cursorPos) { + + if (logger.isTraceEnabled()) { + logger.trace("deliver after commit on " + cursor + ", position=" + cursorPos); + } + if (committed && useRedelivery) { + if (logger.isTraceEnabled()) { + logger.trace("commit & useRedelivery on " + cursor + ", position=" + cursorPos); + } cursor.addPendingDelivery(cursorPos); cursor.redeliver(iterator, cursorPos); return true; } else if (committed) { + if (logger.isTraceEnabled()) { + logger.trace("committed on " + cursor + ", position=" + cursorPos + ", ignoring position"); + } return false; } else if (rolledback) { + if (logger.isTraceEnabled()) { + logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos); + } cursor.positionIgnored(cursorPos); return true; } else { + if (logger.isTraceEnabled()) { + logger.trace("deliverAftercommit/else, marking useRedelivery on " + cursor + ", position " + cursorPos); + } useRedelivery = true; if (lateDeliveries == null) { lateDeliveries = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index 0f36a31..00da382 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.SimpleString; @@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -92,6 +95,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { } @Override + public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) { + return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); + } + public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) { return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index a7baf84..8fec06c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.paging.PagingStoreFactory; import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; import org.apache.activemq.artemis.core.paging.cursor.impl.LivePageCacheImpl; -import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.replication.ReplicationManager; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; @@ -69,7 +68,7 @@ import org.jboss.logging.Logger; */ public class PagingStoreImpl implements PagingStore { - private static final Logger logger = Logger.getLogger(Page.class); + private static final Logger logger = Logger.getLogger(PagingStoreImpl.class); private final SimpleString address; @@ -173,7 +172,7 @@ public class PagingStoreImpl implements PagingStore { this.syncTimer = null; } - this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize()); + this.cursorProvider = storeFactory.newCursorProvider(this, this.storageManager, addressSettings, executor); } @@ -831,7 +830,7 @@ public class PagingStoreImpl implements PagingStore { if (logger.isTraceEnabled()) { logger.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() + - " pageId=" + currentPage.getPageId()); + " pageNr=" + currentPage.getPageId()); } return true; @@ -1021,6 +1020,10 @@ public class PagingStoreImpl implements PagingStore { int tmpCurrentPageId = currentPageId + 1; + if (logger.isTraceEnabled()) { + logger.trace("new pageNr=" + tmpCurrentPageId, new Exception("trace")); + } + if (currentPage != null) { currentPage.close(true); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index acdf57b..1379308 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -543,13 +543,15 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } bindingsJournal = new ReplicatedJournal(((byte) 0), originalBindingsJournal, replicator); messageJournal = new ReplicatedJournal((byte) 1, originalMessageJournal, replicator); + + // We need to send the list while locking otherwise part of the body might get sent too soon + // it will send a list of IDs that we are allocating + replicator.sendLargeMessageIdListMessage(pendingLargeMessages); } finally { storageManagerLock.writeLock().unlock(); } - // it will send a list of IDs that we are allocating - replicator.sendLargeMessageIdListMessage(pendingLargeMessages); sendJournalFile(messageFiles, JournalContent.MESSAGES); sendJournalFile(bindingsFiles, JournalContent.BINDINGS); sendLargeMessageFiles(pendingLargeMessages); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index be193eb..578db6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -340,11 +340,22 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L @Override public String toString() { - return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + - ",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" + + return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this); } + private static String toDate(long timestamp) { + if (timestamp == 0) { + return "0"; + } + else { + return new java.util.Date(timestamp).toString(); + } + + } + + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 29774d6..7f35638 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -222,7 +222,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } else { if (logger.isTraceEnabled()) { - logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID)); + logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx); } // For a tx, it's important that the entry is not added to the cache until commit // since if the client fails then resends them tx we don't want it to get rejected http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 9b3329a..3672fe2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -176,6 +176,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif return; } + logger.tracef("Starting remoting service %s", this); + paused = false; // The remoting service maintains it's own thread pool for handling remoting traffic http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 60235de..50437a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1671,9 +1671,13 @@ public class ActiveMQServerImpl implements ActiveMQServer { this.queueFactory = factory; } - private PagingManager createPagingManager() { + protected PagingManager createPagingManager() { - return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO), addressSettingsRepository); + return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository); + } + + protected PagingStoreFactoryNIO getPagingStoreFactory() { + return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO); } /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 3cdaa66..1f5c74c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; @@ -165,6 +166,10 @@ public class RefsOperation extends TransactionOperationAbstract { try { refmsg.getMessage().decrementRefCount(); } + catch (NonExistentPage e) { + // This could happen on after commit, since the page could be deleted on file earlier by another thread + logger.debug(e); + } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 0224c7d..ae1f5b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -854,7 +854,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { boolean startedTransaction = false; + + if (logger.isTraceEnabled()) { + logger.trace("individualACK messageID=" + messageID); + } + if (tx == null) { + if (logger.isTraceEnabled()) { + logger.trace("individualACK starting new TX"); + } startedTransaction = true; tx = new TransactionImpl(storageManager); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java index 4f64f42..f658fae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java @@ -21,6 +21,7 @@ import javax.transaction.xa.Xid; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -58,7 +60,11 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; +import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO; import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe; import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal; import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe; @@ -70,14 +76,18 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.logging.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class PagingTest extends ActiveMQTestBase { + private static final Logger logger = Logger.getLogger(PagingTest.class); + private ServerLocator locator; private ActiveMQServer server; private ClientSessionFactory sf; @@ -2914,6 +2924,250 @@ public class PagingTest extends ActiveMQTestBase { session.close(); } + + @Test + public void testRollbackOnSendThenSendMore() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>()); + + server.start(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + ClientSession session = sf.createSession(null, null, false, false, true, false, 0); + + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); + + Queue queue = server.locateQueue(ADDRESS); + + queue.getPageSubscription().getPagingStore().startPaging(); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + ClientMessage message; + + for (int i = 0; i < 20; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(new byte[100 * 4]); + + message.putIntProperty(new SimpleString("id"), i); + + producer.send(message); + session.commit(); + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + + } + + for (int i = 20; i < 24; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(new byte[100 * 4]); + + message.putIntProperty(new SimpleString("id"), i); + + producer.send(message); + } + + session.rollback(); + + ClientSession consumerSession = sf.createSession(false, false); + + + queue.getPageSubscription().getPagingStore().disableCleanup(); + + queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); + + consumerSession.start(); + ClientConsumer consumer = consumerSession.createConsumer(ADDRESS, SimpleString.toSimpleString("id > 0")); + for (int i = 0; i < 19; i++) { + ClientMessage messageRec = consumer.receive(5000); + System.err.println("msg::" + messageRec); + Assert.assertNotNull(messageRec); + messageRec.acknowledge(); + consumerSession.commit(); + + // The only reason I'm calling cleanup directly is that it would be easy to debug in case of bugs + // if you see an issue with cleanup here, enjoy debugging this method + queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); + } + queue.getPageSubscription().getPagingStore().enableCleanup(); + + consumerSession.close(); + + + session.close(); + sf.close(); + + + server.stop(); + } + + // The pages are complete, and this is simulating a scenario where the server crashed before deleting the pages. + @Test + public void testRestartWithComplete() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig(); + + final AtomicBoolean mainCleanup = new AtomicBoolean(true); + + class InterruptedCursorProvider extends PageCursorProviderImpl { + + public InterruptedCursorProvider(PagingStore pagingStore, + StorageManager storageManager, + Executor executor, + int maxCacheSize) { + super(pagingStore, storageManager, executor, maxCacheSize); + } + + @Override + public void cleanup() { + if (mainCleanup.get()) { + super.cleanup(); + } + else { + try { + pagingStore.unlock(); + } + catch (Throwable ignored) { + } + } + } + } + + server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) { + @Override + protected PagingStoreFactoryNIO getPagingStoreFactory() { + return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) { + @Override + public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) { + return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); + } + }; + } + + }; + + addServer(server); + + AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes( PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + + server.getAddressSettingsRepository().addMatch("#", defaultSetting); + + server.start(); + + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + ClientSession session = sf.createSession(true, true, 0); + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); + + Queue queue = server.locateQueue(ADDRESS); + + queue.getPageSubscription().getPagingStore().startPaging(); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + ClientMessage message; + + for (int i = 0; i < 20; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(new byte[100 * 4]); + + message.putIntProperty(new SimpleString("idi"), i); + + producer.send(message); + session.commit(); + if (i < 19) { + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + + } + + Assert.assertEquals(20, queue.getPageSubscription().getPagingStore().getCurrentWritingPage()); + + // This will force a scenario where the pages are cleaned up. When restarting we need to check if the current page is complete + // if it is complete we must move to another page avoiding races on cleanup + // which could happen during a crash / restart + long tx = server.getStorageManager().generateID(); + for (int i = 1; i <= 20; i++) { + server.getStorageManager().storePageCompleteTransactional(tx, queue.getID(), new PagePositionImpl(i, 1)); + } + + server.getStorageManager().commit(tx); + + session.close(); + sf.close(); + + server.stop(); + mainCleanup.set(false); + + logger.trace("Server restart"); + + server.start(); + + queue = server.locateQueue(ADDRESS); + + locator = createInVMNonHALocator(); + sf = createSessionFactory(locator); + session = sf.createSession(null, null, false, false, true, false, 0); + producer = session.createProducer(PagingTest.ADDRESS); + + for (int i = 0; i < 10; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(new byte[100 * 4]); + + message.putIntProperty(new SimpleString("newid"), i); + + producer.send(message); + session.commit(); + + if (i == 5) { + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + } + + + mainCleanup.set(true); + + queue = server.locateQueue(ADDRESS); + queue.getPageSubscription().cleanupEntries(false); + queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup(); + + + ClientConsumer consumer = session.createConsumer(ADDRESS); + session.start(); + + for (int i = 0; i < 10; i++) { + message = consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("newid").intValue()); + message.acknowledge(); + } + + server.stop(); + + // Thread.sleep(5000); + + + + } + @Test public void testCommitOnSend() throws Exception { clearDataRecreateServerDirs(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index d16da9f..498beb4 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -42,6 +42,8 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; +import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider; +import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; @@ -105,7 +107,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { public void testDoubleStart() throws Exception { SequentialFileFactory factory = new FakeSequentialFileFactory(); - PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true); + PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, new FakeStoreFactory(factory), PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true); storeImpl.start(); @@ -160,7 +162,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { storeImpl.sync(); - storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true); + storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true); storeImpl.start(); @@ -809,6 +811,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase { } @Override + public PageCursorProvider newCursorProvider(PagingStore store, + StorageManager storageManager, + AddressSettings addressSettings, + Executor executor) { + return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize()); + } + + @Override public void setPagingManager(final PagingManager manager) { }
