This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 05628ce92e9b947cb03353aef45d9cc34e9e1da3 Author: Michael André Pearce <[email protected]> AuthorDate: Mon Apr 15 14:02:11 2019 +0100 ARTEMIS-2306 Support ActiveMQ5 feature JMSXGroupFirstForConsumer Implement using the ActiveMQ5 JMSXGroupFirstForConsumer, property as default, but make it possible for future to make it configurable easily. (Not this PR) Add test --- .../api/config/ActiveMQDefaultConfiguration.java | 6 + .../server/impl/GroupFirstMessageReference.java | 208 +++++++++++++++++++++ .../artemis/core/server/impl/QueueImpl.java | 19 +- .../integration/amqp/JMSMessageGroupsTest.java | 5 + 4 files changed, 233 insertions(+), 5 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 7ea952b..433d3c9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -523,6 +523,8 @@ public final class ActiveMQDefaultConfiguration { public static final boolean DEFAULT_GROUP_REBALANCE = false; + public static final SimpleString DEFAULT_GROUP_FIRST_KEY = SimpleString.toSimpleString("JMSXFirstInGroupID"); + public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST; public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig."; @@ -1430,6 +1432,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_GROUP_REBALANCE; } + public static SimpleString getDefaultGroupFirstKey() { + return DEFAULT_GROUP_FIRST_KEY; + } + public static String getInternalNamingPrefix() { return DEFAULT_INTERNAL_NAMING_PREFIX; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java new file mode 100644 index 0000000..d02ef70 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.util.function.Consumer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.transaction.Transaction; + +/** + * This MessageReference should only be created the first time a group is assigned to a consumer, + * it allows us to make a copy of the message to add the property safely as a delivery semantic, + * without affecting the underlying message. + * + * The overhead is low, as noted above only should be created on first message in a group to a consumer. + */ +public class GroupFirstMessageReference implements MessageReference { + + private final MessageReference messageReference; + private final SimpleString key; + private volatile Message message; + + public GroupFirstMessageReference(SimpleString key, MessageReference messageReference) { + this.messageReference = messageReference; + this.key = key; + } + + @Override + public Message getMessage() { + if (message == null) { + synchronized (this) { + if (message == null) { + message = messageReference.getMessage().copy(); + message.putBooleanProperty(key, true).reencode(); + } + } + } + return message; + } + + @Override + public boolean isPaged() { + return messageReference.isPaged(); + } + + @Override + public long getMessageID() { + return messageReference.getMessageID(); + } + + @Override + public boolean isDurable() { + return messageReference.isDurable(); + } + + @Override + public SimpleString getLastValueProperty() { + return messageReference.getLastValueProperty(); + } + + @Override + public void onDelivery(Consumer<? super MessageReference> callback) { + messageReference.onDelivery(callback); + } + + @Override + public int getMessageMemoryEstimate() { + return messageReference.getMessageMemoryEstimate(); + } + + @Override + public Object getProtocolData() { + return messageReference.getProtocolData(); + } + + @Override + public void setProtocolData(Object data) { + messageReference.setProtocolData(data); + } + + @Override + public MessageReference copy(Queue queue) { + return messageReference.copy(queue); + } + + @Override + public long getScheduledDeliveryTime() { + return messageReference.getScheduledDeliveryTime(); + } + + @Override + public void setScheduledDeliveryTime(long scheduledDeliveryTime) { + messageReference.setScheduledDeliveryTime(scheduledDeliveryTime); + } + + @Override + public int getDeliveryCount() { + return messageReference.getDeliveryCount(); + } + + @Override + public void setDeliveryCount(int deliveryCount) { + messageReference.setDeliveryCount(deliveryCount); + } + + @Override + public void setPersistedCount(int deliveryCount) { + messageReference.setPersistedCount(deliveryCount); + } + + @Override + public int getPersistedCount() { + return messageReference.getPersistedCount(); + } + + @Override + public void incrementDeliveryCount() { + messageReference.incrementDeliveryCount(); + } + + @Override + public void decrementDeliveryCount() { + messageReference.decrementDeliveryCount(); + } + + @Override + public Queue getQueue() { + return messageReference.getQueue(); + } + + @Override + public void acknowledge() throws Exception { + messageReference.acknowledge(); + } + + @Override + public void acknowledge(Transaction tx) throws Exception { + messageReference.acknowledge(tx); + } + + @Override + public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception { + messageReference.acknowledge(tx, consumer); + } + + @Override + public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception { + messageReference.acknowledge(tx, reason, consumer); + } + + @Override + public void emptyConsumerID() { + messageReference.emptyConsumerID(); + } + + @Override + public void setConsumerId(long consumerID) { + messageReference.setConsumerId(consumerID); + } + + @Override + public boolean hasConsumerId() { + return messageReference.hasConsumerId(); + } + + @Override + public long getConsumerId() { + return messageReference.getConsumerId(); + } + + @Override + public void handled() { + messageReference.handled(); + } + + @Override + public void setAlreadyAcked() { + messageReference.setAlreadyAcked(); + } + + @Override + public boolean isAlreadyAcked() { + return messageReference.isAlreadyAcked(); + } + + @Override + public long getPersistentSize() throws ActiveMQException { + return messageReference.getPersistentSize(); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 44f938e..b5205d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -242,6 +242,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile int groupBuckets; + private volatile SimpleString groupFirstKey; + private MessageGroups<Consumer> groups; private volatile Consumer exclusiveConsumer; @@ -496,6 +498,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { this.groups = groupMap(this.groupBuckets); + this.groupFirstKey = ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(); + this.autoDelete = autoDelete == null ? ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete(autoCreated) : autoDelete; this.autoDeleteDelay = autoDeleteDelay == null ? ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay() : autoDeleteDelay; @@ -2552,7 +2556,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (status == HandleStatus.HANDLED) { if (redistributor == null) { - handleMessageGroup(ref, consumer, groupConsumer, groupID); + ref = handleMessageGroup(ref, consumer, groupConsumer, groupID); } deliveriesInTransit.countUp(); @@ -3163,15 +3167,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - + final MessageReference reference; if (redistributor == null) { - handleMessageGroup(ref, consumer, groupConsumer, groupID); + reference = handleMessageGroup(ref, consumer, groupConsumer, groupID); + } else { + reference = ref; } messagesAdded.incrementAndGet(); deliveriesInTransit.countUp(); - proceedDeliver(consumer, ref); + proceedDeliver(consumer, reference); consumers.reset(); return true; } @@ -3202,10 +3208,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return groupConsumer; } - private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { + private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) { if (exclusive) { if (groupConsumer == null) { exclusiveConsumer = consumer; + return new GroupFirstMessageReference(groupFirstKey, ref); } consumers.repeat(); } else if (groupID != null) { @@ -3214,10 +3221,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumers.repeat(); } else if (groupConsumer == null) { groups.put(groupID, consumer); + return new GroupFirstMessageReference(groupFirstKey, ref); } else { consumers.repeat(); } } + return ref; } private void proceedDeliver(Consumer consumer, MessageReference reference) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java index b0bb8a6..e23c788 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java @@ -208,6 +208,11 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport { LOG.debug("Message assigned JMSXGroupID := {}", gid); LOG.debug("Message assigned JMSXGroupSeq := {}", seq); assertEquals("Sequence order should match", sequence.incrementAndGet(), seq); + if (i == 0) { + assertTrue("Message should be marked with first in Group", message.getBooleanProperty("JMSXFirstInGroupID")); + } else { + assertFalse("Message should NOT be marked with first in Group", message.propertyExists("JMSXFirstInGroupID")); + } } session.close();
