This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.27.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit f1030ddc19c0b317164b4e20aa1b8af66bb3b659 Author: Justin Bertram <[email protected]> AuthorDate: Mon Nov 7 15:07:35 2022 -0600 ARTEMIS-4085 exclusive LVQ sending all messages to consumer (cherry picked from commit ca580814de2c2f4466d00e76d5cf70935a94ff81) --- .../artemis/core/server/impl/QueueImpl.java | 1 + .../artemis/tests/integration/server/LVQTest.java | 24 ++++ .../tests/integration/stomp/StompLVQTest.java | 134 +++++++++++++++++++++ 3 files changed, 159 insertions(+) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index d222d192fd..488109c83e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3155,6 +3155,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (groupConsumer != null) { if (noDelivery > 0) { + pruneLastValues(); break; } noDelivery = 0; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index a37b690d4d..4c8d14a34d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -86,6 +86,30 @@ public class LVQTest extends ActiveMQTestBase { Assert.assertEquals(m.getBodyBuffer().readString(), "m2"); } + @Test + public void testSimpleExclusive() throws Exception { + ServerLocator locator = createNettyNonHALocator().setConsumerWindowSize(0); + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession clientSession = addClientSession(sf.createSession(false, true, true)); + final String EXCLUSIVE_QUEUE = "exclusiveQueue"; + + clientSession.createQueue(new QueueConfiguration(EXCLUSIVE_QUEUE).setExclusive(true).setLastValue(true)); + ClientProducer producer = clientSession.createProducer(EXCLUSIVE_QUEUE); + ClientConsumer consumer = clientSession.createConsumer(EXCLUSIVE_QUEUE); + clientSession.start(); + ClientMessage m1 = createTextMessage(clientSession, "m1"); + SimpleString rh = new SimpleString("SMID1"); + m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + ClientMessage m2 = createTextMessage(clientSession, "m2"); + m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); + producer.send(m1); + producer.send(m2); + ClientMessage m = consumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m2"); + } + @Test public void testSimpleRestart() throws Exception { ClientProducer producer = clientSession.createProducer(address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java new file mode 100644 index 0000000000..8004199848 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.stomp; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class StompLVQTest extends StompTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected StompClientConnection producerConn; + protected StompClientConnection consumerConn; + + private final String queue = "lvq"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + server.createQueue(new QueueConfiguration(queue).setLastValue(true).setExclusive(true)); + + producerConn = StompClientConnectionFactory.createClientConnection(uri); + consumerConn = StompClientConnectionFactory.createClientConnection(uri); + } + + @Override + @After + public void tearDown() throws Exception { + try { + if (producerConn != null && producerConn.isConnected()) { + try { + producerConn.disconnect(); + } catch (Exception e) { + // ignore + } + } + } finally { + producerConn.closeTransport(); + } + + try { + if (consumerConn != null && consumerConn.isConnected()) { + try { + consumerConn.disconnect(); + } catch (Exception e) { + // ignore + } + } + } finally { + consumerConn.closeTransport(); + } + + super.tearDown(); + } + + @Test + public void testLVQ() throws Exception { + + producerConn.connect(defUser, defPass); + consumerConn.connect(defUser, defPass); + + subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0); + + try { + for (int i = 1; i <= 100; i++) { + String uuid = UUID.randomUUID().toString(); + + ClientStompFrame frame = producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, queue) + .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test") + .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid) + .setBody(String.valueOf(i))); + + assertEquals(Stomp.Responses.RECEIPT, frame.getCommand()); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + } + } catch (Exception e) { + logger.error(null, e); + } + + List<ClientStompFrame> messages = new ArrayList<>(); + try { + ClientStompFrame frame; + + while ((frame = consumerConn.receiveFrame(10000)) != null) { + assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + + ack(consumerConn, null, frame); + + messages.add(frame); + } + } catch (Exception e) { + logger.error(null, e); + } + + Assert.assertEquals(2, messages.size()); + Assert.assertEquals("1", messages.get(0).getBody()); + Assert.assertEquals("100", messages.get(1).getBody()); + } +} \ No newline at end of file
