This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 18bcd21 ARTEMIS-2306 Make group first off by default, unless
configured
new 8477dd9 This closes #2625
18bcd21 is described below
commit 18bcd21c3ee91f86ff3013a57765be3ae2358e93
Author: Michael André Pearce <[email protected]>
AuthorDate: Wed Apr 17 11:27:14 2019 +0100
ARTEMIS-2306 Make group first off by default, unless configured
---
.../api/config/ActiveMQDefaultConfiguration.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 8 +-
.../resources/schema/artemis-configuration.xsd | 2 +-
.../src/test/resources/artemis-configuration.xsd | 2 +-
docs/user-manual/en/message-grouping.md | 38 +++++++++
.../integration/amqp/JMSMessageGroupsTest.java | 98 +++++++++++++++++++---
.../tests/integration/jms/client/GroupingTest.java | 25 ++----
7 files changed, 142 insertions(+), 33 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 433d3c9..8547414 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -523,7 +523,7 @@ public final class ActiveMQDefaultConfiguration {
public static final boolean DEFAULT_GROUP_REBALANCE = false;
- public static final SimpleString DEFAULT_GROUP_FIRST_KEY =
SimpleString.toSimpleString("JMSXFirstInGroupID");
+ public static final SimpleString DEFAULT_GROUP_FIRST_KEY = null;
public static final RoutingType DEFAULT_ROUTING_TYPE =
RoutingType.MULTICAST;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 5c24143..56bae54 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3256,7 +3256,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (exclusive) {
if (groupConsumer == null) {
exclusiveConsumer = consumer;
- return new GroupFirstMessageReference(groupFirstKey, ref);
+ if (groupFirstKey != null) {
+ return new GroupFirstMessageReference(groupFirstKey, ref);
+ }
}
consumers.repeat();
} else if (groupID != null) {
@@ -3265,7 +3267,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
consumers.repeat();
} else if (groupConsumer == null) {
groups.put(groupID, consumer);
- return new GroupFirstMessageReference(groupFirstKey, ref);
+ if (groupFirstKey != null) {
+ return new GroupFirstMessageReference(groupFirstKey, ref);
+ }
} else {
consumers.repeat();
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index d1484d4..957f8b4 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3056,7 +3056,7 @@
</xsd:annotation>
</xsd:element>
- <xsd:element name="default-group-first-key" type="xsd:string"
default="JMSXFirstInGroupID" maxOccurs="1" minOccurs="0">
+ <xsd:element name="default-group-first-key" type="xsd:string"
maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
key used to mark a message is first in a group for a
consumer
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd
b/artemis-tools/src/test/resources/artemis-configuration.xsd
index db5141d..1dbfb74 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -3056,7 +3056,7 @@
</xsd:annotation>
</xsd:element>
- <xsd:element name="default-group-first-key" type="xsd:string"
default="JMSXFirstInGroupID" maxOccurs="1" minOccurs="0">
+ <xsd:element name="default-group-first-key" type="xsd:string"
maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
key used to mark a message is first in a group for a
consumer
diff --git a/docs/user-manual/en/message-grouping.md
b/docs/user-manual/en/message-grouping.md
index b53c3a6..405b868 100644
--- a/docs/user-manual/en/message-grouping.md
+++ b/docs/user-manual/en/message-grouping.md
@@ -98,6 +98,44 @@ producer.send(message);
This then closes the message group so if another message is sent in the future
with the same message group ID it will be reassigned to a new consumer.
+#### Notifying Consumer of Group Ownership change
+
+ActiveMQ supports putting a boolean header, set on the first message sent to a
consumer for a particular message group.
+
+To enable this, you must set a header key that the broker will use to set the
flag.
+
+In the examples we use `JMSXGroupFirstForConsumer` but it can be any header
key value you want.
+
+
+By setting `group-first-key` to `JMSXGroupFirstForConsumer` at the queue
level, every time a new group is assigned a consumer the header
`JMSXGroupFirstForConsumer` will be set to true on the first message.
+
+```xml
+<address name="foo.bar">
+ <multicast>
+ <queue name="orders1" group-first-key="JMSXGroupFirstForConsumer"/>
+ </multicast>
+</address>
+```
+
+Or on auto-create when using the JMS Client by using address parameters when
+creating the destination used by the consumer.
+
+```java
+Queue queue =
session.createQueue("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
+Topic topic =
session.createTopic("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
+```
+
+Also the default for all queues under and address can be defaulted using the
+`address-setting` configuration:
+
+```xml
+<address-setting match="my.address">
+ <default-group-first-key>JMSXGroupFirstForConsumer</default-group-first-key>
+</address-setting>
+```
+
+By default this is null, and therefor OFF.
+
#### Rebalancing Message Groups
Sometimes after new consumers are added you can find that if you have long
lived groups, that they have no groups assigned, and thus are not being
utilised, this is because the long lived groups will already be assigned to
existing consumers.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
index e23c788..5f7d22e 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -30,6 +31,9 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +57,23 @@ public class JMSMessageGroupsTest extends
JMSClientTestSupport {
return "AMQP,OPENWIRE,CORE";
}
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ super.configureAddressPolicy(server);
+
+ AddressSettings addressSettings = new AddressSettings();
+
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setAutoCreateQueues(isAutoCreateQueues());
+ addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
+
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
+
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
+
addressSettings.setDefaultGroupFirstKey(SimpleString.toSimpleString("JMSXFirstInGroupID"));
+
+
+ server.getConfiguration().getAddressesSettings().put("GroupFirst.#",
addressSettings);
+ }
+
@Test(timeout = 60000)
public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception {
testMessageGroups(AMQPConnection, AMQPConnection);
@@ -102,6 +123,8 @@ public class JMSMessageGroupsTest extends
JMSClientTestSupport {
public void testMessageGroups(ConnectionSupplier
producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier)
throws Exception {
testGroupSeqIsNeverLost(producerConnectionSupplier,
consumerConnectionSupplier);
testGroupSeqCloseGroup(producerConnectionSupplier,
consumerConnectionSupplier);
+ testGroupFirst(producerConnectionSupplier, consumerConnectionSupplier);
+ testGroupFirstDefaultOff(producerConnectionSupplier,
consumerConnectionSupplier);
}
@@ -184,19 +207,72 @@ public class JMSMessageGroupsTest extends
JMSClientTestSupport {
public void testGroupSeqIsNeverLost(ConnectionSupplier
producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier)
throws Exception {
AtomicInteger sequenceCounter = new AtomicInteger();
AtomicInteger consumedSequenceCounter = new AtomicInteger();
+ String queueName = getQueueName();
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ try (Connection producerConnection =
producerConnectionSupplier.createConnection();
+ Connection consumerConnection =
consumerConnectionSupplier.createConnection()) {
+ sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT,
sequenceCounter);
+ readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT,
consumedSequenceCounter, null);
+ }
+ }
+ }
+
+ public void testGroupFirst(ConnectionSupplier producerConnectionSupplier,
ConnectionSupplier consumerConnectionSupplier) throws Exception {
+ AtomicInteger sequenceCounter = new AtomicInteger();
+ AtomicInteger consumedSequenceCounter = new AtomicInteger();
+ //Use a queue that IS pre-fixed with GroupFirst so should full under
Group First address settings
+ String queueName = "GroupFirst." + getQueueName();
+
+ for (int i = 0; i < ITERATIONS; ++i) {
+ try (Connection producerConnection =
producerConnectionSupplier.createConnection();
+ Connection consumerConnection =
consumerConnectionSupplier.createConnection()) {
+ sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT,
sequenceCounter);
+ readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT,
consumedSequenceCounter, this::groupFirstCheck);
+ }
+ }
+ }
+
+ private void groupFirstCheck(int i, Message message) {
+ try {
+ if (i == 0) {
+ assertTrue("Message should be marked with first in Group",
message.getBooleanProperty("JMSXFirstInGroupID"));
+ } else {
+ assertFalse("Message should NOT be marked with first in Group",
message.propertyExists("JMSXFirstInGroupID"));
+ }
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testGroupFirstDefaultOff(ConnectionSupplier
producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier)
throws Exception {
+ AtomicInteger sequenceCounter = new AtomicInteger();
+ AtomicInteger consumedSequenceCounter = new AtomicInteger();
+ //Use a queue that IS NOT pre-fixed with GroupFirst so should full under
default address settings.
+ String queueName = getQueueName();
for (int i = 0; i < ITERATIONS; ++i) {
try (Connection producerConnection =
producerConnectionSupplier.createConnection();
- Connection consumerConnection =
producerConnectionSupplier.createConnection()) {
- sendMessagesToBroker(producerConnection, MESSAGE_COUNT,
sequenceCounter);
- readMessagesOnBroker(consumerConnection, MESSAGE_COUNT,
consumedSequenceCounter);
+ Connection consumerConnection =
consumerConnectionSupplier.createConnection()) {
+ sendMessagesToBroker(queueName, producerConnection, MESSAGE_COUNT,
sequenceCounter);
+ readMessagesOnBroker(queueName, consumerConnection, MESSAGE_COUNT,
consumedSequenceCounter, this::groupFirstOffCheck);
}
}
}
- protected void readMessagesOnBroker(Connection connection, int count,
AtomicInteger sequence) throws Exception {
+ private void groupFirstOffCheck(int i, Message message) {
+ try {
+ assertFalse("Message should NOT be marked with first in Group",
message.propertyExists("JMSXFirstInGroupID"));
+ } catch (JMSException e) {
+ fail(e.getMessage());
+ }
+ }
+
+
+
+ protected void readMessagesOnBroker(String queueName, Connection
connection, int count, AtomicInteger sequence, BiConsumer<Integer, Message>
additionalCheck) throws Exception {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(getQueueName());
+ Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < MESSAGE_COUNT; ++i) {
@@ -208,19 +284,19 @@ public class JMSMessageGroupsTest extends
JMSClientTestSupport {
LOG.debug("Message assigned JMSXGroupID := {}", gid);
LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
assertEquals("Sequence order should match",
sequence.incrementAndGet(), seq);
- if (i == 0) {
- assertTrue("Message should be marked with first in Group",
message.getBooleanProperty("JMSXFirstInGroupID"));
- } else {
- assertFalse("Message should NOT be marked with first in Group",
message.propertyExists("JMSXFirstInGroupID"));
+ if (additionalCheck != null) {
+ additionalCheck.accept(i, message);
}
}
session.close();
}
- protected void sendMessagesToBroker(Connection connection, int count,
AtomicInteger sequence) throws Exception {
+
+
+ protected void sendMessagesToBroker(String queueName, Connection
connection, int count, AtomicInteger sequence) throws Exception {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(getQueueName());
+ Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
byte[] buffer = new byte[MESSAGE_SIZE];
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index 2dc3bcc..ba8cd95 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -30,7 +30,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.UUID;
-import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -521,17 +520,10 @@ public class GroupingTest extends JMSTestBase {
ctx.close();
}
- @Test
- public void testDefaultGroupFirstKey() throws Exception {
- testGroupFirstKey(null);
- }
@Test
- public void testCustomGroupFirstKey() throws Exception {
- testGroupFirstKey("my-custom-key");
- }
-
- private void testGroupFirstKey(String customFirstGroupKey) throws Exception
{
+ public void testGroupFirstKey() throws Exception {
+ String customFirstGroupKey = "my-custom-key";
ConnectionFactory fact = getCF();
Assume.assumeFalse("only makes sense withOUT auto-group",
((ActiveMQConnectionFactory) fact).isAutoGroup());
Assume.assumeTrue("only makes sense withOUT explicit group-id",
((ActiveMQConnectionFactory) fact).getGroupID() == null);
@@ -570,7 +562,6 @@ public class GroupingTest extends JMSTestBase {
ctx.commit();
- String firstGroupKey = customFirstGroupKey == null ?
ActiveMQDefaultConfiguration.getDefaultGroupFirstKey().toString() :
customFirstGroupKey;
//First set of msgs should go to the first consumer only
for (int j = 0; j < 10; j++) {
TextMessage tm = (TextMessage) consumer1.receive(10000);
@@ -579,9 +570,9 @@ public class GroupingTest extends JMSTestBase {
assertEquals("Message" + j, tm.getText());
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
if (j == 0) {
- assertTrue(tm.getBooleanProperty(firstGroupKey));
+ assertTrue(tm.getBooleanProperty(customFirstGroupKey));
} else {
- assertFalse(tm.getBooleanProperty(firstGroupKey));
+ assertFalse(tm.propertyExists(customFirstGroupKey));
}
}
@@ -598,9 +589,9 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
if (j == 10) {
- assertTrue(tm.getBooleanProperty(firstGroupKey));
+ assertTrue(tm.getBooleanProperty(customFirstGroupKey));
} else {
- assertFalse(tm.getBooleanProperty(firstGroupKey));
+ assertFalse(tm.propertyExists(customFirstGroupKey));
}
}
@@ -617,9 +608,9 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
if (j == 20) {
- assertTrue(tm.getBooleanProperty(firstGroupKey));
+ assertTrue(tm.getBooleanProperty(customFirstGroupKey));
} else {
- assertFalse(tm.getBooleanProperty(firstGroupKey));
+ assertFalse(tm.propertyExists(customFirstGroupKey));
}
}
ctx.commit();