[
https://issues.apache.org/jira/browse/ARTEMIS-4085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Justin Bertram updated ARTEMIS-4085:
------------------------------------
Description:
Currently exclusive last value queues deliver all messages to consumers as
opposed to only the last one.
I wrote the following test:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected StompClientConnection producerConn;
protected StompClientConnection consumerConn;
private final String queue = "lvq";
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(new
QueueConfiguration(queue).setLastValue(true).setExclusive(true));
producerConn = StompClientConnectionFactory.createClientConnection(uri);
consumerConn = StompClientConnectionFactory.createClientConnection(uri);
}
@Override
@After
public void tearDown() throws Exception {
try {
if (producerConn != null && producerConn.isConnected()) {
try {
producerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
producerConn.closeTransport();
}
try {
if (consumerConn != null && consumerConn.isConnected()) {
try {
consumerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
consumerConn.closeTransport();
}
super.tearDown();
}
@Test
public void testLVQ() throws Exception {
producerConn.connect(defUser, defPass);
consumerConn.connect(defUser, defPass);
subscribe(consumerConn, "lvqtest",
Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0);
try {
for (int i = 1; i <= 100; i++) {
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame =
producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, queue)
.addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
.setBody(String.valueOf(i)));
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
assertEquals(uuid,
frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
} catch (Exception e) {
logger.error(null, e);
}
List<ClientStompFrame> messages = new ArrayList<>();
try {
ClientStompFrame frame;
while ((frame = consumerConn.receiveFrame(10000)) != null) {
assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
ack(consumerConn, null, frame);
messages.add(frame);
}
} catch (Exception e) {
logger.error(null, e);
}
Assert.assertEquals(2, messages.size());
Assert.assertEquals("1", messages.get(0).getBody());
Assert.assertEquals("100", messages.get(1).getBody());
}
}{code}
was:
Currently exclusive last value queues deliver all messages to consumers as
opposed to only the last one.
I wrote the following test:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected StompClientConnection producerConn;
protected StompClientConnection consumerConn;
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(new
QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true).setExclusive(true));
producerConn = StompClientConnectionFactory.createClientConnection(uri);
consumerConn = StompClientConnectionFactory.createClientConnection(uri);
}
@Override
@After
public void tearDown() throws Exception {
try {
boolean connected = producerConn != null && producerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
producerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
producerConn.closeTransport();
}
try {
boolean connected = consumerConn != null && consumerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
consumerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
consumerConn.closeTransport();
}
}
@Test
public void testLVQ() throws Exception {
final String name = "lvq";
producerConn.connect(defUser, defPass);
consumerConn.connect(defUser, defPass);
subscribe(consumerConn, "lvqtest",
Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
Thread producer = new Thread() {
@Override
public void run() {
try {
for (int i = 1; i <= 100; i++) {
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, name)
.addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
// .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
.setBody(String.valueOf(i));
frame = producerConn.sendFrame(frame);
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
} catch(Exception e) {
logger.error(null, e);
}
}
};
Thread consumer = new Thread() {
@Override
public void run() {
try {
List<ClientStompFrame> messages = new ArrayList<>();
ClientStompFrame frame;
while((frame = consumerConn.receiveFrame(10000)) != null)
{
assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
ack(consumerConn, null, frame);
messages.add(frame);
}
logger.info("Received messages: {}", messages);
Assert.assertEquals(2, messages.size());
Assert.assertEquals("1", messages.get(0).getBody());
Assert.assertEquals("100", messages.get(1).getBody());
} catch(Exception e) {
logger.error(null, e);
}
}
};
producer.start();
producer.join();
consumer.start();
consumer.join();
}
} {code}
> Exclusive LVQ not working as expected
> -------------------------------------
>
> Key: ARTEMIS-4085
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4085
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: STOMP
> Affects Versions: 2.26.0
> Reporter: Lauri Keel
> Assignee: Justin Bertram
> Priority: Major
>
> Currently exclusive last value queues deliver all messages to consumers as
> opposed to only the last one.
> I wrote the following test:
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.activemq.artemis.tests.integration.stomp;
> import java.lang.invoke.MethodHandles;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import org.apache.activemq.artemis.api.core.Message;
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
> import
> org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
> import
> org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
> import
> org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
> import org.junit.After;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> @RunWith(Parameterized.class)
> public class StompLVQTest extends StompTestBase {
> private static final Logger logger =
> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> protected StompClientConnection producerConn;
> protected StompClientConnection consumerConn;
> private final String queue = "lvq";
> @Override
> @Before
> public void setUp() throws Exception {
> super.setUp();
> server.createQueue(new
> QueueConfiguration(queue).setLastValue(true).setExclusive(true));
> producerConn = StompClientConnectionFactory.createClientConnection(uri);
> consumerConn = StompClientConnectionFactory.createClientConnection(uri);
> }
> @Override
> @After
> public void tearDown() throws Exception {
> try {
> if (producerConn != null && producerConn.isConnected()) {
> try {
> producerConn.disconnect();
> } catch (Exception e) {
> // ignore
> }
> }
> } finally {
> producerConn.closeTransport();
> }
> try {
> if (consumerConn != null && consumerConn.isConnected()) {
> try {
> consumerConn.disconnect();
> } catch (Exception e) {
> // ignore
> }
> }
> } finally {
> consumerConn.closeTransport();
> }
> super.tearDown();
> }
> @Test
> public void testLVQ() throws Exception {
> producerConn.connect(defUser, defPass);
> consumerConn.connect(defUser, defPass);
> subscribe(consumerConn, "lvqtest",
> Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0);
> try {
> for (int i = 1; i <= 100; i++) {
> String uuid = UUID.randomUUID().toString();
> ClientStompFrame frame =
> producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND)
>
> .addHeader(Stomp.Headers.Send.DESTINATION, queue)
>
> .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
>
> .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
>
> .setBody(String.valueOf(i)));
> assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
> assertEquals(uuid,
> frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
> }
> } catch (Exception e) {
> logger.error(null, e);
> }
> List<ClientStompFrame> messages = new ArrayList<>();
> try {
> ClientStompFrame frame;
> while ((frame = consumerConn.receiveFrame(10000)) != null) {
> assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
> ack(consumerConn, null, frame);
> messages.add(frame);
> }
> } catch (Exception e) {
> logger.error(null, e);
> }
> Assert.assertEquals(2, messages.size());
> Assert.assertEquals("1", messages.get(0).getBody());
> Assert.assertEquals("100", messages.get(1).getBody());
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)