This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 05628ce92e9b947cb03353aef45d9cc34e9e1da3
Author: Michael AndrĂ© Pearce <[email protected]>
AuthorDate: Mon Apr 15 14:02:11 2019 +0100

    ARTEMIS-2306 Support ActiveMQ5 feature JMSXGroupFirstForConsumer
    
    Implement using the ActiveMQ5 JMSXGroupFirstForConsumer, property as 
default, but make it possible for future to make it configurable easily. (Not 
this PR)
    Add test
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   6 +
 .../server/impl/GroupFirstMessageReference.java    | 208 +++++++++++++++++++++
 .../artemis/core/server/impl/QueueImpl.java        |  19 +-
 .../integration/amqp/JMSMessageGroupsTest.java     |   5 +
 4 files changed, 233 insertions(+), 5 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 7ea952b..433d3c9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -523,6 +523,8 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final boolean DEFAULT_GROUP_REBALANCE = false;
 
+   public static final SimpleString DEFAULT_GROUP_FIRST_KEY = 
SimpleString.toSimpleString("JMSXFirstInGroupID");
+
    public static final RoutingType DEFAULT_ROUTING_TYPE = 
RoutingType.MULTICAST;
 
    public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig.";
@@ -1430,6 +1432,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_GROUP_REBALANCE;
    }
 
+   public static SimpleString getDefaultGroupFirstKey() {
+      return DEFAULT_GROUP_FIRST_KEY;
+   }
+
    public static String getInternalNamingPrefix() {
       return DEFAULT_INTERNAL_NAMING_PREFIX;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
new file mode 100644
index 0000000..d02ef70
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.function.Consumer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+
+/**
+ * This MessageReference should only be created the first time a group is 
assigned to a consumer,
+ * it allows us to make a copy of the message to add the property safely as a 
delivery semantic,
+ * without affecting the underlying message.
+ *
+ * The overhead is low, as noted above only should be created on first message 
in a group to a consumer.
+ */
+public class GroupFirstMessageReference implements MessageReference {
+
+   private final MessageReference messageReference;
+   private final SimpleString key;
+   private volatile Message message;
+
+   public GroupFirstMessageReference(SimpleString key, MessageReference 
messageReference) {
+      this.messageReference = messageReference;
+      this.key = key;
+   }
+
+   @Override
+   public Message getMessage() {
+      if (message == null) {
+         synchronized (this) {
+            if (message == null) {
+               message = messageReference.getMessage().copy();
+               message.putBooleanProperty(key, true).reencode();
+            }
+         }
+      }
+      return message;
+   }
+
+   @Override
+   public boolean isPaged() {
+      return messageReference.isPaged();
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageReference.getMessageID();
+   }
+
+   @Override
+   public boolean isDurable() {
+      return messageReference.isDurable();
+   }
+
+   @Override
+   public SimpleString getLastValueProperty() {
+      return messageReference.getLastValueProperty();
+   }
+
+   @Override
+   public void onDelivery(Consumer<? super MessageReference> callback) {
+      messageReference.onDelivery(callback);
+   }
+
+   @Override
+   public int getMessageMemoryEstimate() {
+      return messageReference.getMessageMemoryEstimate();
+   }
+
+   @Override
+   public Object getProtocolData() {
+      return messageReference.getProtocolData();
+   }
+
+   @Override
+   public void setProtocolData(Object data) {
+      messageReference.setProtocolData(data);
+   }
+
+   @Override
+   public MessageReference copy(Queue queue) {
+      return messageReference.copy(queue);
+   }
+
+   @Override
+   public long getScheduledDeliveryTime() {
+      return messageReference.getScheduledDeliveryTime();
+   }
+
+   @Override
+   public void setScheduledDeliveryTime(long scheduledDeliveryTime) {
+      messageReference.setScheduledDeliveryTime(scheduledDeliveryTime);
+   }
+
+   @Override
+   public int getDeliveryCount() {
+      return messageReference.getDeliveryCount();
+   }
+
+   @Override
+   public void setDeliveryCount(int deliveryCount) {
+      messageReference.setDeliveryCount(deliveryCount);
+   }
+
+   @Override
+   public void setPersistedCount(int deliveryCount) {
+      messageReference.setPersistedCount(deliveryCount);
+   }
+
+   @Override
+   public int getPersistedCount() {
+      return messageReference.getPersistedCount();
+   }
+
+   @Override
+   public void incrementDeliveryCount() {
+      messageReference.incrementDeliveryCount();
+   }
+
+   @Override
+   public void decrementDeliveryCount() {
+      messageReference.decrementDeliveryCount();
+   }
+
+   @Override
+   public Queue getQueue() {
+      return messageReference.getQueue();
+   }
+
+   @Override
+   public void acknowledge() throws Exception {
+      messageReference.acknowledge();
+   }
+
+   @Override
+   public void acknowledge(Transaction tx) throws Exception {
+      messageReference.acknowledge(tx);
+   }
+
+   @Override
+   public void acknowledge(Transaction tx, ServerConsumer consumer) throws 
Exception {
+      messageReference.acknowledge(tx, consumer);
+   }
+
+   @Override
+   public void acknowledge(Transaction tx, AckReason reason, ServerConsumer 
consumer) throws Exception {
+      messageReference.acknowledge(tx, reason, consumer);
+   }
+
+   @Override
+   public void emptyConsumerID() {
+      messageReference.emptyConsumerID();
+   }
+
+   @Override
+   public void setConsumerId(long consumerID) {
+      messageReference.setConsumerId(consumerID);
+   }
+
+   @Override
+   public boolean hasConsumerId() {
+      return messageReference.hasConsumerId();
+   }
+
+   @Override
+   public long getConsumerId() {
+      return messageReference.getConsumerId();
+   }
+
+   @Override
+   public void handled() {
+      messageReference.handled();
+   }
+
+   @Override
+   public void setAlreadyAcked() {
+      messageReference.setAlreadyAcked();
+   }
+
+   @Override
+   public boolean isAlreadyAcked() {
+      return messageReference.isAlreadyAcked();
+   }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return messageReference.getPersistentSize();
+   }
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 44f938e..b5205d8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -242,6 +242,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private volatile int groupBuckets;
 
+   private volatile SimpleString groupFirstKey;
+
    private MessageGroups<Consumer> groups;
 
    private volatile Consumer exclusiveConsumer;
@@ -496,6 +498,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       this.groups = groupMap(this.groupBuckets);
 
+      this.groupFirstKey = 
ActiveMQDefaultConfiguration.getDefaultGroupFirstKey();
+
       this.autoDelete = autoDelete == null ? 
ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete(autoCreated) : 
autoDelete;
 
       this.autoDeleteDelay = autoDeleteDelay == null ? 
ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay() : autoDeleteDelay;
@@ -2552,7 +2556,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                if (status == HandleStatus.HANDLED) {
 
                   if (redistributor == null) {
-                     handleMessageGroup(ref, consumer, groupConsumer, groupID);
+                     ref = handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
                   }
 
                   deliveriesInTransit.countUp();
@@ -3163,15 +3167,17 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             HandleStatus status = handle(ref, consumer);
 
             if (status == HandleStatus.HANDLED) {
-
+               final MessageReference reference;
                if (redistributor == null) {
-                  handleMessageGroup(ref, consumer, groupConsumer, groupID);
+                  reference = handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
+               } else {
+                  reference = ref;
                }
 
                messagesAdded.incrementAndGet();
 
                deliveriesInTransit.countUp();
-               proceedDeliver(consumer, ref);
+               proceedDeliver(consumer, reference);
                consumers.reset();
                return true;
             }
@@ -3202,10 +3208,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       return groupConsumer;
    }
 
-   private void handleMessageGroup(MessageReference ref, Consumer consumer, 
Consumer groupConsumer, SimpleString groupID) {
+   private MessageReference handleMessageGroup(MessageReference ref, Consumer 
consumer, Consumer groupConsumer, SimpleString groupID) {
       if (exclusive) {
          if (groupConsumer == null) {
             exclusiveConsumer = consumer;
+            return new GroupFirstMessageReference(groupFirstKey, ref);
          }
          consumers.repeat();
       } else if (groupID != null) {
@@ -3214,10 +3221,12 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             consumers.repeat();
          } else if (groupConsumer == null) {
             groups.put(groupID, consumer);
+            return new GroupFirstMessageReference(groupFirstKey, ref);
          } else {
             consumers.repeat();
          }
       }
+      return ref;
    }
 
    private void proceedDeliver(Consumer consumer, MessageReference reference) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
index b0bb8a6..e23c788 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
@@ -208,6 +208,11 @@ public class JMSMessageGroupsTest extends 
JMSClientTestSupport {
          LOG.debug("Message assigned JMSXGroupID := {}", gid);
          LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
          assertEquals("Sequence order should match", 
sequence.incrementAndGet(), seq);
+         if (i == 0) {
+            assertTrue("Message should be marked with first in Group", 
message.getBooleanProperty("JMSXFirstInGroupID"));
+         } else {
+            assertFalse("Message should NOT be marked with first in Group", 
message.propertyExists("JMSXFirstInGroupID"));
+         }
       }
 
       session.close();

Reply via email to