Repository: activemq-artemis Updated Branches: refs/heads/master 8b6d3a65b -> a8443512b
fixing BackupSyncJournalTest Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e973c4b Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e973c4b Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e973c4b Branch: refs/heads/master Commit: 2e973c4bff6911ed2a0a6f86b41531e7a7a212cb Parents: 2d3061d Author: Clebert Suconic <[email protected]> Authored: Tue Jan 12 15:39:31 2016 -0500 Committer: Clebert Suconic <[email protected]> Committed: Tue Jan 12 17:22:56 2016 -0500 ---------------------------------------------------------------------- .../core/impl/wireformat/SessionReceiveLargeMessage.java | 4 +++- .../apache/activemq/artemis/core/server/impl/QueueImpl.java | 5 ++--- .../activemq/artemis/core/server/impl/ServerConsumerImpl.java | 7 ++++--- 3 files changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e973c4b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java index 8b32256..aa5d98f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveLargeMessage.java @@ -84,7 +84,9 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac buffer.writeLong(consumerID); buffer.writeInt(deliveryCount); buffer.writeLong(largeMessageSize); - message.encodeHeadersAndProperties(buffer); + if (message != null) { + message.encodeHeadersAndProperties(buffer); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e973c4b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 12b5231..6f3cc6f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2493,8 +2493,10 @@ public class QueueImpl implements Queue { private void proceedDeliver(Consumer consumer, MessageReference reference) { try { consumer.proceedDeliver(reference); + deliveriesInTransit.countDown(); } catch (Throwable t) { + deliveriesInTransit.countDown(); ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference); synchronized (this) { @@ -2510,9 +2512,6 @@ public class QueueImpl implements Queue { addHead(reference); } } - finally { - deliveriesInTransit.countDown(); - } } private boolean checkExpired(final MessageReference reference) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e973c4b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 9468819..1cfc4f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -970,7 +970,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { public boolean deliver() throws Exception { lockDelivery.readLock().lock(); try { - if (largeMessage == null) { + LargeServerMessage currentLargeMessage = largeMessage; + if (currentLargeMessage == null) { return true; } @@ -984,7 +985,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } if (!sentInitialPacket) { - context = largeMessage.getBodyEncoder(); + context = currentLargeMessage.getBodyEncoder(); sizePendingLargeMessage = context.getLargeBodySize(); @@ -992,7 +993,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { sentInitialPacket = true; - int packetSize = callback.sendLargeMessage(largeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); + int packetSize = callback.sendLargeMessage(currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount()); if (availableCredits != null) { availableCredits.addAndGet(-packetSize);
