ARTEMIS-332 - Duplicate delivery over Bridges under OME scenarios, paging and other failures
https://issues.apache.org/jira/browse/ARTEMIS-332 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/96849a42 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/96849a42 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/96849a42 Branch: refs/heads/master Commit: 96849a42b756b3ba4f8ab65fff668893c59fc39f Parents: 34b6635 Author: Clebert Suconic <[email protected]> Authored: Mon Jan 4 19:49:27 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Mon Jan 4 20:49:28 2016 -0500 ---------------------------------------------------------------------- .../apache/activemq/artemis/utils/ByteUtil.java | 37 ++- .../activemq/artemis/utils/ByteUtilTest.java | 8 + .../core/protocol/core/impl/ChannelImpl.java | 4 + .../core/io/IOCriticalErrorListener.java | 2 +- .../openwire/amq/AMQServerConsumer.java | 37 +-- .../core/management/impl/QueueControlImpl.java | 4 +- .../artemis/core/paging/PagingStore.java | 2 + .../artemis/core/paging/PagingStoreFactory.java | 2 + .../artemis/core/paging/cursor/PageCache.java | 11 - .../core/paging/cursor/PageCursorProvider.java | 5 +- .../core/paging/cursor/PageSubscription.java | 5 +- .../core/paging/cursor/PagedReference.java | 3 +- .../core/paging/cursor/PagedReferenceImpl.java | 36 ++- .../paging/cursor/impl/LivePageCacheImpl.java | 10 - .../core/paging/cursor/impl/PageCacheImpl.java | 37 +-- .../cursor/impl/PageCursorProviderImpl.java | 61 +++-- .../cursor/impl/PageSubscriptionImpl.java | 147 +++++++---- .../core/paging/impl/PagingStoreFactoryNIO.java | 4 + .../core/paging/impl/PagingStoreImpl.java | 6 + .../core/persistence/StorageManager.java | 2 + .../impl/journal/JournalStorageManager.java | 25 +- .../impl/nullpm/NullStorageManager.java | 23 +- .../core/postoffice/DuplicateIDCache.java | 2 + .../artemis/core/postoffice/PostOffice.java | 2 +- .../postoffice/impl/DuplicateIDCacheImpl.java | 93 ++++++- .../core/postoffice/impl/PostOfficeImpl.java | 36 +-- .../core/ServerSessionPacketHandler.java | 90 ++++--- .../artemis/core/server/MessageReference.java | 4 +- .../activemq/artemis/core/server/Queue.java | 7 +- .../core/server/ScheduledDeliveryHandler.java | 5 +- .../artemis/core/server/ServerSession.java | 2 + .../cluster/impl/ClusterConnectionImpl.java | 2 +- .../core/server/impl/ActiveMQServerImpl.java | 30 +-- .../core/server/impl/LastValueQueue.java | 79 ++++-- .../core/server/impl/MessageReferenceImpl.java | 2 +- .../artemis/core/server/impl/QueueImpl.java | 87 ++++--- .../artemis/core/server/impl/RefsOperation.java | 27 +- .../impl/ScheduledDeliveryHandlerImpl.java | 5 +- .../core/server/impl/ServerConsumerImpl.java | 62 ++++- .../core/server/impl/ServerSessionImpl.java | 15 ++ .../impl/ScheduledDeliveryHandlerTest.java | 4 +- .../transaction/impl/TransactionImplTest.java | 5 + .../integration/DuplicateDetectionTest.java | 249 ++++++------------- .../storage/PersistMultiThreadTest.java | 4 + .../core/paging/impl/PagingStoreImplTest.java | 4 + .../impl/DuplicateDetectionUnitTest.java | 2 +- .../core/server/impl/fakes/FakeConsumer.java | 60 +++-- .../core/server/impl/fakes/FakePostOffice.java | 3 +- 48 files changed, 788 insertions(+), 564 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index b7ff841..50ac3cc 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -19,9 +19,12 @@ package org.apache.activemq.artemis.utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.SimpleString; public class ByteUtil { + public static final String NON_ASCII_STRING = "@@@@@"; + private static final char[] hexArray = "0123456789ABCDEF".toCharArray(); public static String maxString(String value, int size) { @@ -34,22 +37,30 @@ public class ByteUtil { } public static String bytesToHex(byte[] bytes, int groupSize) { - if (bytes == null) { - return "null"; + char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; + int outPos = 0; + for (int j = 0; j < bytes.length; j++) { + if (j > 0 && j % groupSize == 0) { + hexChars[outPos++] = ' '; + } + int v = bytes[j] & 0xFF; + hexChars[outPos++] = hexArray[v >>> 4]; + hexChars[outPos++] = hexArray[v & 0x0F]; } - else { - char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; - int outPos = 0; - for (int j = 0; j < bytes.length; j++) { - if (j > 0 && j % groupSize == 0) { - hexChars[outPos++] = ' '; - } - int v = bytes[j] & 0xFF; - hexChars[outPos++] = hexArray[v >>> 4]; - hexChars[outPos++] = hexArray[v & 0x0F]; + return new String(hexChars); + } + + public static String toSimpleString(byte[] bytes) { + SimpleString simpleString = new SimpleString(bytes); + String value = simpleString.toString(); + + for (char c : value.toCharArray()) { + if (c < ' ' || c > 127) { + return NON_ASCII_STRING; } - return new String(hexChars); } + + return value; } private static int numberOfGroups(byte[] bytes, int groupSize) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index ada02f4..feebae1 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -31,6 +31,14 @@ public class ByteUtilTest { } @Test + public void testNonASCII() { + Assert.assertEquals("aA", ByteUtil.toSimpleString(new byte[]{97, 0, 65, 0})); + Assert.assertEquals(ByteUtil.NON_ASCII_STRING, ByteUtil.toSimpleString(new byte[]{0, 97, 0, 65})); + + System.out.println(ByteUtil.toSimpleString(new byte[]{0, 97, 0, 65})); + } + + @Test public void testMaxString() { byte[] byteArray = new byte[20 * 1024]; System.out.println(ByteUtil.maxString(ByteUtil.bytesToHex(byteArray, 2), 150)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 9e5ccb3..4ef6104 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -536,6 +536,10 @@ public final class ChannelImpl implements Channel { if (resendCache != null && packet.isRequiresConfirmations()) { lastConfirmedCommandID.incrementAndGet(); + if (isTrace) { + ActiveMQClientLogger.LOGGER.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID); + } + receivedBytes += packet.getPacketSize(); if (receivedBytes >= confWindowSize) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java index 1a38462..c0e180e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCriticalErrorListener.java @@ -21,5 +21,5 @@ package org.apache.activemq.artemis.core.io; */ public interface IOCriticalErrorListener { - void onIOException(Exception code, String message, SequentialFile file); + void onIOException(Throwable code, String message, SequentialFile file); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java index b0ec7ed..625adcd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.util.List; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.QueueBinding; @@ -125,24 +126,30 @@ public class AMQServerConsumer extends ServerConsumerImpl { } public void amqPutBackToDeliveringList(final List<MessageReference> refs) { - synchronized (this.deliveringRefs) { - for (MessageReference ref : refs) { - ref.incrementDeliveryCount(); - deliveringRefs.add(ref); - } - //adjust the order. Suppose deliveringRefs has 2 existing - //refs m1, m2, and refs has 3 m3, m4, m5 - //new order must be m3, m4, m5, m1, m2 - if (refs.size() > 0) { - long first = refs.get(0).getMessage().getMessageID(); - MessageReference m = deliveringRefs.peek(); - while (m.getMessage().getMessageID() != first) { - deliveringRefs.poll(); - deliveringRefs.add(m); - m = deliveringRefs.peek(); + try { + synchronized (this.deliveringRefs) { + for (MessageReference ref : refs) { + ref.incrementDeliveryCount(); + deliveringRefs.add(ref); + } + //adjust the order. Suppose deliveringRefs has 2 existing + //refs m1, m2, and refs has 3 m3, m4, m5 + //new order must be m3, m4, m5, m1, m2 + if (refs.size() > 0) { + long first = refs.get(0).getMessage().getMessageID(); + MessageReference m = deliveringRefs.peek(); + while (m.getMessage().getMessageID() != first) { + deliveringRefs.poll(); + deliveringRefs.add(m); + m = deliveringRefs.peek(); + } } } } + catch (ActiveMQException e) { + // TODO: what to do here? + throw new IllegalStateException(e.getMessage(), e); + } } public void moveToDeadLetterAddress(long mid, Throwable cause) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index b61409e..26d22a0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -339,7 +339,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { * @param refs * @return */ - private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) { + private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) throws ActiveMQException { Map<String, Object>[] messages = new Map[refs.size()]; int i = 0; for (MessageReference ref : refs) { @@ -350,7 +350,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override - public Map<String, Map<String, Object>[]> listDeliveringMessages() { + public Map<String, Map<String, Object>[]> listDeliveringMessages() throws ActiveMQException { checkStarted(); clearIO(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index e831966..c8808b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -47,6 +47,8 @@ public interface PagingStore extends ActiveMQComponent { int getNumberOfPages(); + void criticalException(Throwable e); + /** * Returns the page id of the current page in which the system is writing files. */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java index 8c2d11a..91907ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java @@ -38,4 +38,6 @@ public interface PagingStoreFactory { SequentialFileFactory newFileFactory(SimpleString address) throws Exception; + void criticalException(Throwable e); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java index 2c82974..bb21b6b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCache.java @@ -41,17 +41,6 @@ public interface PageCache extends SoftValueHashMap.ValueCache { */ PagedMessage getMessage(int messageNumber); - /** - * When the cache is being created, - * We need to first read the files before other threads can get messages from this. - */ - void lock(); - - /** - * You have to call this method within the same thread you called lock - */ - void unlock(); - void close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index 951b83c..c8404ba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.paging.cursor; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -24,7 +25,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage; */ public interface PageCursorProvider { - PageCache getPageCache(long pageNr); + PageCache getPageCache(long pageNr) throws ActiveMQException; PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub); @@ -38,7 +39,7 @@ public interface PageCursorProvider { PageSubscription createSubscription(long queueId, Filter filter, boolean durable); - PagedMessage getMessage(PagePosition pos); + PagedMessage getMessage(PagePosition pos) throws ActiveMQException; void processReload() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index df2ccc3..386f21f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.cursor; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -95,7 +96,7 @@ public interface PageSubscription { void reloadPageCompletion(PagePosition position); - void reloadPageInfo(long pageNr); + void reloadPageInfo(long pageNr) throws ActiveMQException; /** * To be called when the cursor decided to ignore a position. @@ -147,7 +148,7 @@ public interface PageSubscription { * @param pos * @return */ - PagedMessage queryMessage(PagePosition pos); + PagedMessage queryMessage(PagePosition pos) throws ActiveMQException; /** * @return executor used by the PageSubscription http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java index c1ff089..46041c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.paging.cursor; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.MessageReference; @@ -23,5 +24,5 @@ public interface PagedReference extends MessageReference { PagePosition getPosition(); - PagedMessage getPagedMessage(); + PagedMessage getPagedMessage() throws ActiveMQException; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 05b88d6..964737f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.paging.cursor; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -49,12 +50,12 @@ public class PagedReferenceImpl implements PagedReference { private boolean alreadyAcked; @Override - public ServerMessage getMessage() { + public ServerMessage getMessage() throws ActiveMQException { return getPagedMessage().getMessage(); } @Override - public synchronized PagedMessage getPagedMessage() { + public synchronized PagedMessage getPagedMessage() throws ActiveMQException { PagedMessage returnMessage = message != null ? message.get() : null; // We only keep a few references on the Queue from paging... @@ -107,25 +108,42 @@ public class PagedReferenceImpl implements PagedReference { @Override public int getMessageMemoryEstimate() { if (messageEstimate < 0) { - messageEstimate = getMessage().getMemoryEstimate(); + try { + messageEstimate = getMessage().getMemoryEstimate(); + } + catch (ActiveMQException e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } } return messageEstimate; } @Override public MessageReference copy(final Queue queue) { - return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription); + try { + return new PagedReferenceImpl(this.position, this.getPagedMessage(), this.subscription); + } + catch (ActiveMQException e) { + ActiveMQServerLogger.LOGGER.warn(e); + return this; + } } @Override public long getScheduledDeliveryTime() { if (deliveryTime == null) { - ServerMessage msg = getMessage(); - if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { - deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + try { + ServerMessage msg = getMessage(); + if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) { + deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME); + } + else { + deliveryTime = 0L; + } } - else { - deliveryTime = 0L; + catch (ActiveMQException e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return 0L; } } return deliveryTime; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java index fd88e7a..29d990a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java @@ -68,16 +68,6 @@ public class LivePageCacheImpl implements LivePageCache { } @Override - public void lock() { - // nothing to be done on live cache - } - - @Override - public void unlock() { - // nothing to be done on live cache - } - - @Override public synchronized boolean isLive() { return isLive; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java index 5efe6d6..8d9b872 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCacheImpl.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.core.paging.cursor.impl; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.cursor.PageCache; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -32,8 +29,6 @@ class PageCacheImpl implements PageCache { // Attributes ---------------------------------------------------- - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private PagedMessage[] messages; private final Page page; @@ -50,17 +45,11 @@ class PageCacheImpl implements PageCache { @Override public PagedMessage getMessage(final int messageNumber) { - lock.readLock().lock(); - try { - if (messageNumber < messages.length) { - return messages[messageNumber]; - } - else { - return null; - } + if (messageNumber < messages.length) { + return messages[messageNumber]; } - finally { - lock.readLock().unlock(); + else { + return null; } } @@ -70,29 +59,13 @@ class PageCacheImpl implements PageCache { } @Override - public void lock() { - lock.writeLock().lock(); - } - - @Override - public void unlock() { - lock.writeLock().unlock(); - } - - @Override public void setMessages(final PagedMessage[] messages) { this.messages = messages; } @Override public int getNumberOfMessages() { - lock.readLock().lock(); - try { - return messages.length; - } - finally { - lock.readLock().unlock(); - } + return messages.length; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index f9d2bb5..ef57e1c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -109,7 +111,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public PagedMessage getMessage(final PagePosition pos) { + public PagedMessage getMessage(final PagePosition pos) throws ActiveMQException { PageCache cache = getPageCache(pos.getPageNr()); if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) { @@ -128,10 +130,9 @@ public class PageCursorProviderImpl implements PageCursorProvider { } @Override - public PageCache getPageCache(final long pageId) { + public PageCache getPageCache(final long pageId) throws ActiveMQException { try { - boolean needToRead = false; - PageCache cache = null; + PageCache cache; synchronized (softCache) { if (pageId > pagingStore.getCurrentWritingPage()) { return null; @@ -144,47 +145,43 @@ public class PageCursorProviderImpl implements PageCursorProvider { } cache = createPageCache(pageId); - needToRead = true; // anyone reading from this cache will have to wait reading to finish first // we also want only one thread reading this cache - cache.lock(); if (isTrace) { ActiveMQServerLogger.LOGGER.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress()); } + readPage((int) pageId, cache); softCache.put(pageId, cache); } } - // Reading is done outside of the synchronized block, however - // the page stays locked until the entire reading is finished - if (needToRead) { - Page page = null; - try { - page = pagingStore.createPage((int) pageId); + return cache; + } + catch (Throwable e) { + throw new ActiveMQIOErrorException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e); + } + } - storageManager.beforePageRead(); - page.open(); + private void readPage(int pageId, PageCache cache) throws Exception { + Page page = null; + try { + page = pagingStore.createPage(pageId); - List<PagedMessage> pgdMessages = page.read(storageManager); - cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()])); - } - finally { - try { - if (page != null) { - page.close(); - } - } - catch (Throwable ignored) { - } - storageManager.afterPageRead(); - cache.unlock(); - } - } + storageManager.beforePageRead(); + page.open(); - return cache; + List<PagedMessage> pgdMessages = page.read(storageManager); + cache.setMessages(pgdMessages.toArray(new PagedMessage[pgdMessages.size()])); } - catch (Exception e) { - throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e); + finally { + try { + if (page != null) { + page.close(); + } + } + catch (Throwable ignored) { + } + storageManager.afterPageRead(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index b800c4b..d7a6ded 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -32,8 +32,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -332,7 +333,7 @@ final class PageSubscriptionImpl implements PageSubscription { return "PageSubscriptionImpl [cursorId=" + cursorId + ", queue=" + queue + ", filter = " + filter + "]"; } - private PagedReference getReference(PagePosition pos) { + private PagedReference getReference(PagePosition pos) throws ActiveMQException { return cursorProvider.newReference(pos, cursorProvider.getMessage(pos), this); } @@ -341,7 +342,7 @@ final class PageSubscriptionImpl implements PageSubscription { return new CursorIterator(); } - private PagedReference internalGetNext(final PagePosition pos) { + private PagedReference internalGetNext(final PagePosition pos) throws ActiveMQException { PagePosition retPos = pos.nextMessage(); PageCache cache = cursorProvider.getPageCache(pos.getPageNr()); @@ -470,11 +471,17 @@ final class PageSubscriptionImpl implements PageSubscription { public void onError(final int errorCode, final String errorMessage) { error = " errorCode=" + errorCode + ", msg=" + errorMessage; ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, error); + getPagingStore().criticalException(new ActiveMQException(errorMessage)); } @Override public void done() { - processACK(position); + try { + processACK(position); + } + catch (ActiveMQException e) { + getPagingStore().criticalException(e); + } } @Override @@ -504,7 +511,12 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void addPendingDelivery(final PagePosition position) { - getPageInfo(position).incrementPendingTX(); + try { + getPageInfo(position).incrementPendingTX(); + } + catch (Exception e) { + getPagingStore().criticalException(e); + } } @Override @@ -523,13 +535,8 @@ final class PageSubscriptionImpl implements PageSubscription { } @Override - public PagedMessage queryMessage(PagePosition pos) { - try { - return cursorProvider.getMessage(pos); - } - catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } + public PagedMessage queryMessage(PagePosition pos) throws ActiveMQException { + return cursorProvider.getMessage(pos); } /** @@ -547,18 +554,32 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public void reloadPreparedACK(final Transaction tx, final PagePosition position) { deliveredCount.incrementAndGet(); - installTXCallback(tx, position); + try { + installTXCallback(tx, position); + } + catch (Exception e) { + getPagingStore().criticalException(e); + } } @Override public void positionIgnored(final PagePosition position) { - processACK(position); + try { + processACK(position); + } + catch (Exception e) { + getPagingStore().criticalException(e); + } } - @Override public void lateDeliveryRollback(PagePosition position) { - PageCursorInfo cursorInfo = processACK(position); - cursorInfo.decrementPendingTX(); + try { + PageCursorInfo cursorInfo = processACK(position); + cursorInfo.decrementPendingTX(); + } + catch (ActiveMQException e) { + getPagingStore().criticalException(e); + } } @Override @@ -729,15 +750,15 @@ final class PageSubscriptionImpl implements PageSubscription { } @Override - public void reloadPageInfo(long pageNr) { + public void reloadPageInfo(long pageNr) throws ActiveMQException { getPageInfo(pageNr, true); } - private PageCursorInfo getPageInfo(final PagePosition pos) { + private PageCursorInfo getPageInfo(final PagePosition pos) throws ActiveMQException { return getPageInfo(pos.getPageNr(), true); } - private PageCursorInfo getPageInfo(final long pageNr, boolean create) { + private PageCursorInfo getPageInfo(final long pageNr, boolean create) throws ActiveMQException { synchronized (consumedPages) { PageCursorInfo pageInfo = consumedPages.get(pageNr); @@ -771,7 +792,7 @@ final class PageSubscriptionImpl implements PageSubscription { // To be called only after the ACK has been processed and guaranteed to be on storage // The only exception is on non storage events such as not matching messages - private PageCursorInfo processACK(final PagePosition pos) { + private PageCursorInfo processACK(final PagePosition pos) throws ActiveMQException { if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0) { if (isTrace) { ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK"); @@ -807,7 +828,7 @@ final class PageSubscriptionImpl implements PageSubscription { * @param tx * @param position */ - private void installTXCallback(final Transaction tx, final PagePosition position) { + private void installTXCallback(final Transaction tx, final PagePosition position) throws ActiveMQException { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); @@ -827,7 +848,7 @@ final class PageSubscriptionImpl implements PageSubscription { } - private PageTransactionInfo getPageTransaction(final PagedReference reference) { + private PageTransactionInfo getPageTransaction(final PagedReference reference) throws ActiveMQException { if (reference.getPagedMessage().getTransactionID() >= 0) { return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID()); } @@ -895,13 +916,24 @@ final class PageSubscriptionImpl implements PageSubscription { @Override public String toString() { - return "PageCursorInfo::PageID=" + pageId + - " numberOfMessage = " + - numberOfMessages + - ", confirmed = " + - confirmed + - ", isDone=" + - this.isDone(); + try { + return "PageCursorInfo::PageID=" + pageId + + " numberOfMessage = " + + numberOfMessages + + ", confirmed = " + + confirmed + + ", isDone=" + + this.isDone(); + } + catch (Exception e) { + return "PageCursorInfo::PageID=" + pageId + + " numberOfMessage = " + + numberOfMessages + + ", confirmed = " + + confirmed + + ", isDone=" + + e.toString(); + } } public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) { @@ -928,7 +960,13 @@ final class PageSubscriptionImpl implements PageSubscription { } public boolean isDone() { - return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); + try { + return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0); + } + catch (ActiveMQException e) { + getPagingStore().criticalException(e); + throw new RuntimeException(e.getMessage(), e); + } } public boolean isPendingDelete() { @@ -966,12 +1004,17 @@ final class PageSubscriptionImpl implements PageSubscription { public void addACK(final PagePosition posACK) { if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("numberOfMessages = " + getNumberOfMessages() + - " confirmed = " + - (confirmed.get() + 1) + - " pendingTX = " + pendingTX + - ", page = " + - pageId + " posACK = " + posACK); + try { + ActiveMQServerLogger.LOGGER.trace("numberOfMessages = " + getNumberOfMessages() + + " confirmed = " + + (confirmed.get() + 1) + + " pendingTX = " + pendingTX + + ", page = " + + pageId + " posACK = " + posACK); + } + catch (Throwable ignored) { + ActiveMQServerLogger.LOGGER.debug(ignored.getMessage(), ignored); + } } boolean added = internalAddACK(posACK); @@ -1004,7 +1047,7 @@ final class PageSubscriptionImpl implements PageSubscription { } } - private int getNumberOfMessages() { + private int getNumberOfMessages() throws ActiveMQException { if (wasLive) { // if the page was live at any point, we need to // get the number of messages from the page-cache @@ -1023,7 +1066,7 @@ final class PageSubscriptionImpl implements PageSubscription { } - private static final class PageCursorTX extends TransactionOperationAbstract { + private final class PageCursorTX extends TransactionOperationAbstract { private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<>(); @@ -1046,7 +1089,12 @@ final class PageSubscriptionImpl implements PageSubscription { List<PagePosition> positions = entry.getValue(); for (PagePosition confirmed : positions) { - cursor.processACK(confirmed); + try { + cursor.processACK(confirmed); + } + catch (ActiveMQException e) { + getPagingStore().criticalException(e); + } cursor.deliveredCount.decrementAndGet(); } @@ -1125,13 +1173,13 @@ final class PageSubscriptionImpl implements PageSubscription { currentDelivery = moveNext(); return currentDelivery; } - catch (RuntimeException e) { - e.printStackTrace(); - throw e; + catch (ActiveMQException e) { + getPagingStore().criticalException(e); + throw new IllegalStateException(e.getMessage(), e); } } - private PagedReference moveNext() { + private PagedReference moveNext() throws ActiveMQException { synchronized (PageSubscriptionImpl.this) { boolean match = false; @@ -1261,9 +1309,14 @@ final class PageSubscriptionImpl implements PageSubscription { deliveredCount.incrementAndGet(); PagedReference delivery = currentDelivery; if (delivery != null) { - PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPosition()); - if (info != null) { - info.remove(delivery.getPosition()); + try { + PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(currentDelivery.getPosition()); + if (info != null) { + info.remove(currentDelivery.getPosition()); + } + } + catch (ActiveMQException e) { + getPagingStore().criticalException(e); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java index 0b0d210..39cd956 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java @@ -87,6 +87,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory { // Public -------------------------------------------------------- + public void criticalException(Throwable e) { + critialErrorListener.onIOException(e, e.getMessage(), null); + } + @Override public void stop() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 1463b3c..9136c17 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -176,6 +176,12 @@ public class PagingStoreImpl implements PagingStore { } + @Override + public void criticalException(Throwable e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + storeFactory.criticalException(e); + } + /** * @param addressSettings */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 23dff8d..a0a5200 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -64,6 +64,8 @@ import org.apache.activemq.artemis.utils.IDGenerator; */ public interface StorageManager extends IDGenerator, ActiveMQComponent { + void criticalError(Throwable error); + /** * Get the context associated with the thread for later reuse */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 77cfd0d..390e742 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -54,22 +54,22 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; -import org.apache.activemq.artemis.core.io.SequentialFile; -import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -111,11 +111,11 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperation; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.ExecutorFactory; -import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.XidCodecSupport; @@ -204,6 +204,8 @@ public class JournalStorageManager implements StorageManager { private boolean journalLoaded = false; + private final IOCriticalErrorListener ioCriticalErrorListener; + private final Configuration config; // Persisted core configuration @@ -222,6 +224,8 @@ public class JournalStorageManager implements StorageManager { final IOCriticalErrorListener criticalErrorListener) { this.executorFactory = executorFactory; + this.ioCriticalErrorListener = criticalErrorListener; + this.config = config; executor = executorFactory.getExecutor(); @@ -276,6 +280,11 @@ public class JournalStorageManager implements StorageManager { } @Override + public void criticalError(Throwable error) { + ioCriticalErrorListener.onIOException(error, error.getMessage(), null); + } + + @Override public void clearContext() { OperationContextImpl.clearContext(); } @@ -3031,7 +3040,7 @@ public class JournalStorageManager implements StorageManager { bridgeRepresentation + "]"; } else { - return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]"; + return "DuplicateIDEncoding [address=" + address + ",str=" + ByteUtil.toSimpleString(duplID) + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]"; } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 381222f..39c5de5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -28,9 +28,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; -import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.paging.PageTransactionInfo; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -62,6 +63,26 @@ public class NullStorageManager implements StorageManager { private volatile boolean started; + private final IOCriticalErrorListener ioCriticalErrorListener; + + public NullStorageManager(IOCriticalErrorListener ioCriticalErrorListener) { + this.ioCriticalErrorListener = ioCriticalErrorListener; + } + + public NullStorageManager() { + this(new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + code.printStackTrace(); + } + }); + } + + @Override + public void criticalError(Throwable error) { + + } + private static final OperationContext dummyContext = new OperationContext() { @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java index f316c56..26920f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java @@ -25,6 +25,8 @@ public interface DuplicateIDCache { boolean contains(byte[] duplicateID); + boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception; + void addToCache(byte[] duplicateID) throws Exception; void addToCache(byte[] duplicateID, Transaction tx) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index e6bb837..d07ea5b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -44,7 +44,7 @@ public interface PostOffice extends ActiveMQComponent { void addBinding(Binding binding) throws Exception; - Binding removeBinding(SimpleString uniqueName, Transaction tx) throws Exception; + Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception; /** * It will lookup the Binding without creating an item on the Queue if non-existent http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 7059671..cfeeb7b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -29,6 +30,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; +import org.apache.activemq.artemis.utils.ByteUtil; /** * A DuplicateIDCacheImpl @@ -37,6 +39,8 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract */ public class DuplicateIDCacheImpl implements DuplicateIDCache { + private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); + // ByteHolder, position private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>(); @@ -71,12 +75,27 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public void load(final List<Pair<byte[], Long>> theIds) throws Exception { - int count = 0; - long txID = -1; + // If we have more IDs than cache size, we shrink the first ones + int deleteCount = theIds.size() - cacheSize; + if (deleteCount < 0) { + deleteCount = 0; + } + for (Pair<byte[], Long> id : theIds) { - if (count < cacheSize) { + if (deleteCount > 0) { + if (txID == -1) { + txID = storageManager.generateID(); + } + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB())); + } + + storageManager.deleteDuplicateIDTransactional(txID, id.getB()); + deleteCount--; + } + else { ByteArrayHolder bah = new ByteArrayHolder(id.getA()); Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB()); @@ -84,17 +103,11 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { cache.put(bah, ids.size()); ids.add(pair); - } - else { - // cache size has been reduced in config - delete the extra records - if (txID == -1) { - txID = storageManager.generateID(); + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB())); } - - storageManager.deleteDuplicateIDTransactional(txID, id.getB()); } - count++; } if (txID != -1) { @@ -111,6 +124,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public void deleteFromCache(byte[] duplicateID) throws Exception { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl::deleteFromCache deleting id=" + describeID(duplicateID, 0)); + } + ByteArrayHolder bah = new ByteArrayHolder(duplicateID); Integer posUsed = cache.remove(bah); @@ -124,6 +141,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (id.getA().equals(bah)) { id.setA(null); storageManager.deleteDuplicateID(id.getB()); + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::deleteFromCache deleting id=" + describeID(duplicateID, id.getB())); + } id.setB(null); } } @@ -131,9 +151,23 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } + private String describeID(byte[] duplicateID, long id) { + if (id != 0) { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID); + } + else { + return ByteUtil.bytesToHex(duplicateID, 4) + ", simpleString=" + ByteUtil.toSimpleString(duplicateID) + ", id=" + id; + } + } + @Override public boolean contains(final byte[] duplID) { - return cache.get(new ByteArrayHolder(duplID)) != null; + boolean contains = cache.get(new ByteArrayHolder(duplID)) != null; + + if (contains) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0)); + } + return contains; } @Override @@ -147,6 +181,21 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } @Override + public synchronized boolean atomicVerify(final byte[] duplID, final Transaction tx) throws Exception { + + if (contains(duplID)) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQDuplicateIdException()); + } + return false; + } + else { + addToCache(duplID, tx, true); + return true; + } + + } + public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception { long recordID = -1; @@ -170,6 +219,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { addToCacheInMemory(duplID, recordID); } else { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID)); + } // For a tx, it's important that the entry is not added to the cache until commit // since if the client fails then resends them tx we don't want it to get rejected tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); @@ -183,6 +235,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID)); + } + ByteArrayHolder holder = new ByteArrayHolder(duplID); cache.put(holder, pos); @@ -195,6 +251,10 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { // The id here might be null if it was explicit deleted if (id.getA() != null) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB())); + } + cache.remove(id.getA()); // Record already exists - we delete the old one and add the new one @@ -217,11 +277,19 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { // -1 would mean null on this case id.setB(recordID >= 0 ? recordID : null); + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB())); + } + holder.pos = pos; } else { id = new Pair<>(holder, recordID >= 0 ? recordID : null); + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB())); + } + ids.add(id); holder.pos = pos; @@ -234,6 +302,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { @Override public void clear() throws Exception { + ActiveMQServerLogger.LOGGER.debug("DuplicateIDCacheImpl(" + this.address + ")::clear removing duplicate ID data"); synchronized (this) { if (ids.size() > 0) { long tx = storageManager.generateID(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 4b25023..659c612 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -41,8 +41,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.NotificationType; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; @@ -463,7 +463,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public synchronized Binding removeBinding(final SimpleString uniqueName, Transaction tx) throws Exception { + public synchronized Binding removeBinding(final SimpleString uniqueName, + Transaction tx, + boolean deleteData) throws Exception { addressSettingsRepository.clearCache(); @@ -473,7 +475,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding throw new ActiveMQNonExistentQueueException(); } - if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) { + if (deleteData && addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null) { pagingManager.deletePageStore(binding.getAddress()); managementService.unregisterAddress(binding.getAddress()); @@ -1159,31 +1161,19 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(message.getAddress())); - if (cacheBridge.contains(bridgeDupBytes)) { - ActiveMQServerLogger.LOGGER.duplicateMessageDetectedThruBridge(message); - - if (context.getTransaction() != null) { - context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException()); - } + if (context.getTransaction() == null) { + context.setTransaction(new TransactionImpl(storageManager)); + startedTX.set(true); + } + if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) { + context.getTransaction().rollback(); + startedTX.set(false); message.decrementRefCount(); - return false; } - else { - if (context.getTransaction() == null) { - context.setTransaction(new TransactionImpl(storageManager)); - startedTX.set(true); - } - } - - // on the bridge case there is a case where the bridge reconnects - // and the send hasn't finished yet (think of CPU outages). - // for that reason we add the cache right away - cacheBridge.addToCache(bridgeDupBytes, context.getTransaction(), true); message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID); - } else { // if used BridgeDuplicate, it's not going to use the regular duplicate @@ -1222,7 +1212,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding startedTX.set(true); } - cache.addToCache(duplicateIDBytes, context.getTransaction()); + cache.addToCache(duplicateIDBytes, context.getTransaction(), false); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 78fd83f..3285bc1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -16,55 +16,21 @@ */ package org.apache.activemq.artemis.core.protocol.core; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND; - -import java.util.List; - import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; @@ -105,14 +71,48 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; -import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_COMMIT; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CONSUMER_CLOSE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CREATECONSUMER; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_EXPIRED; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_INDIVIDUAL_ACKNOWLEDGE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_QUEUEQUERY; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ROLLBACK; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_CONTINUATION; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_SEND_LARGE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_START; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_STOP; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_COMMIT; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_END; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FAILED; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_FORGET; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_GET_TIMEOUT; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_INDOUBT_XIDS; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_JOIN; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_PREPARE; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_RESUME; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_ROLLBACK; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_START; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND; + public class ServerSessionPacketHandler implements ChannelHandler { private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -483,6 +483,16 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } } + catch (ActiveMQIOErrorException e) { + getSession().markTXFailed(e); + if (requiresResponse) { + ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e); + response = new ActiveMQExceptionMessage(e); + } + else { + ActiveMQServerLogger.LOGGER.caughtException(e); + } + } catch (ActiveMQXAException e) { if (requiresResponse) { ActiveMQServerLogger.LOGGER.debug("Sending exception to client", e); @@ -507,6 +517,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } catch (Throwable t) { + getSession().markTXFailed(t); if (requiresResponse) { ActiveMQServerLogger.LOGGER.warn("Sending unexpected exception to the client", t); ActiveMQException activeMQInternalErrorException = new ActiveMQInternalErrorException(); @@ -611,7 +622,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence()); - Connection oldTransportConnection = remotingConnection.getTransportConnection(); remotingConnection = newConnection; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 0ff55ac..95b30b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.ActiveMQException; + /** * A reference to a message. * @@ -25,7 +27,7 @@ public interface MessageReference { boolean isPaged(); - ServerMessage getMessage(); + ServerMessage getMessage() throws ActiveMQException; /** * We define this method aggregation here because on paging we need to hold the original estimate, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index f5a19a8..9ea60cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -122,7 +123,7 @@ public interface Queue extends Bindable { MessageReference removeReferenceWithID(long id) throws Exception; - MessageReference getReference(long id); + MessageReference getReference(long id) throws ActiveMQException; int deleteAllReferences() throws Exception; @@ -236,9 +237,9 @@ public interface Queue extends Bindable { /** * cancels scheduled messages and send them to the head of the queue. */ - void deliverScheduledMessages(); + void deliverScheduledMessages() throws ActiveMQException; - void postAcknowledge(MessageReference ref); + void postAcknowledge(MessageReference ref) throws ActiveMQException; float getRate(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java index 5afb052..8260507 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; import java.util.List; @@ -28,7 +29,7 @@ public interface ScheduledDeliveryHandler { List<MessageReference> getScheduledReferences(); - List<MessageReference> cancel(Filter filter); + List<MessageReference> cancel(Filter filter) throws ActiveMQException; - MessageReference removeReferenceWithID(long id); + MessageReference removeReferenceWithID(long id) throws ActiveMQException; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 6026887..62bb3b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -73,6 +73,8 @@ public interface ServerSession extends SecurityAuth { void xaSuspend() throws Exception; + void markTXFailed(Throwable e); + QueueCreator getQueueCreator(); List<Xid> xaGetInDoubtXids(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 9624ab9..64f99eb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1244,7 +1244,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn throw new IllegalStateException("Cannot find binding for queue " + clusterName); } - postOffice.removeBinding(binding.getUniqueName(), null); + postOffice.removeBinding(binding.getUniqueName(), null, false); } private synchronized void resetBinding(final SimpleString clusterName) throws Exception {
