This is an automated email from the ASF dual-hosted git repository.
tabish121 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 53173375c4 ARTEMIS-5996 refactor STOMP to use checkAutoCreate from
ServerSession
53173375c4 is described below
commit 53173375c4d5e4b57890e89d37ed8b666c974474
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Apr 13 11:03:25 2026 -0500
ARTEMIS-5996 refactor STOMP to use checkAutoCreate from ServerSession
The STOMP implementation duplicates much of the logic related to
auto-creating addresses & queues from the ServerSession. It should be
refactored to use ServerSession's checkAutoCreate method.
---
.../stomp/ActiveMQStompProtocolMessageBundle.java | 5 +-
.../core/protocol/stomp/StompConnection.java | 126 ++++++++-------------
.../protocol/stomp/VersionedStompFrameHandler.java | 4 +-
.../tests/integration/stomp/StompLVQTest.java | 3 +-
.../tests/integration/stomp/StompTestBase.java | 14 +++
.../integration/stomp/StompWithSecurityTest.java | 58 ++++++++++
.../tests/integration/stomp/v12/StompV12Test.java | 10 +-
7 files changed, 129 insertions(+), 91 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index 71595d031d..c761914d53 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -23,7 +23,7 @@ import org.apache.activemq.artemis.logs.BundleFactory;
/**
* Logger Codes 339000 - 339999
*/
-@LogBundle(projectCode = "AMQ", regexID = "339[0-9]{3}")
+@LogBundle(projectCode = "AMQ", regexID = "339[0-9]{3}", retiredIDs = {339041})
public interface ActiveMQStompProtocolMessageBundle {
ActiveMQStompProtocolMessageBundle BUNDLE =
BundleFactory.newBundle(ActiveMQStompProtocolMessageBundle.class);
@@ -144,7 +144,4 @@ public interface ActiveMQStompProtocolMessageBundle {
@Message(id = 339040, value = "Undefined escape sequence: {}")
ActiveMQStompException undefinedEscapeSequence(String sequence);
-
- @Message(id = 339041, value = "Not allowed to specify {} semantics on {}
address.")
- ActiveMQStompException illegalSemantics(String requested, String exists);
}
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 3f7636226d..5bd009477d 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -33,12 +32,11 @@ import
java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.api.core.AutoCreateResult;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import
org.apache.activemq.artemis.core.protocol.stomp.v10.StompFrameHandlerV10;
@@ -47,16 +45,12 @@ import
org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import
org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
-import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.VersionLoader;
@@ -166,63 +160,6 @@ public final class StompConnection extends
AbstractRemotingConnection {
this.minLargeMessageSize =
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE,
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED,
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
acceptorUsed.getConfiguration()), acceptorUsed.getConfiguration());
}
- // TODO this should take a type - send or receive so it knows whether to
check the address or the queue
- public void checkDestination(String destination) throws
ActiveMQStompException {
- if (!manager.destinationExists(destination)) {
- throw
BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
- }
- }
-
- public void autoCreateDestinationIfPossible(String destination, RoutingType
routingType) throws ActiveMQStompException {
- try {
- SimpleString simpleDestination = SimpleString.of(destination);
- AddressInfo addressInfo =
manager.getServer().getAddressInfo(simpleDestination);
- AddressSettings addressSettings =
manager.getServer().getAddressSettingsRepository().getMatch(destination);
- RoutingType effectiveAddressRoutingType =
Objects.requireNonNullElse(routingType,
addressSettings.getDefaultAddressRoutingType());
- ServerSession session = getSession().getCoreSession();
- /*
- * If the address doesn't exist then it is created if possible.
- * If the address does exist but doesn't support the routing-type
then the address is updated if possible.
- */
- if (addressInfo == null) {
- if (addressSettings.isAutoCreateAddresses()) {
- session.createAddress(simpleDestination,
effectiveAddressRoutingType, true);
- }
- } else if
(!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
- if (addressSettings.isAutoCreateAddresses()) {
- EnumSet<RoutingType> routingTypes =
EnumSet.noneOf(RoutingType.class);
- for (RoutingType existingRoutingType :
addressInfo.getRoutingTypes()) {
- routingTypes.add(existingRoutingType);
- }
- routingTypes.add(effectiveAddressRoutingType);
- manager.getServer().updateAddressInfo(simpleDestination,
routingTypes);
- }
- }
-
- // auto create the queue if the address is ANYCAST or FQQN
- if ((CompositeAddress.isFullyQualified(destination) ||
effectiveAddressRoutingType == RoutingType.ANYCAST) &&
addressSettings.isAutoCreateQueues() &&
manager.getServer().locateQueue(simpleDestination) == null) {
-
session.createQueue(QueueConfiguration.of(destination).setRoutingType(effectiveAddressRoutingType).setAutoCreated(true));
- }
- } catch (ActiveMQQueueExistsException e) {
- // ignore
- } catch (Exception e) {
- logger.debug("Exception while auto-creating destination", e);
- throw new ActiveMQStompException(e.getMessage(),
e).setHandler(frameHandler);
- }
- }
-
- public void checkRoutingSemantics(String destination, RoutingType
routingType) throws ActiveMQStompException {
- AddressInfo addressInfo =
manager.getServer().getAddressInfo(SimpleString.of(destination));
-
- // may be null here if, for example, the management address is being
checked
- if (addressInfo != null) {
- Set<RoutingType> actualDeliveryModesOfAddress =
addressInfo.getRoutingTypes();
- if (routingType != null &&
!actualDeliveryModesOfAddress.contains(routingType)) {
- throw BUNDLE.illegalSemantics(routingType.toString(),
actualDeliveryModesOfAddress.toString());
- }
- }
- }
-
@Override
public void destroy() {
if (DESTROYED_UPDATER.compareAndSet(this, 0, 1)) {
@@ -551,9 +488,48 @@ public final class StompConnection extends
AbstractRemotingConnection {
RoutingType subscriptionType,
Integer consumerWindowSize) throws
ActiveMQStompException {
validateSelector(selector);
- autoCreateDestinationIfPossible(destination, subscriptionType);
- checkDestination(destination);
- checkRoutingSemantics(destination, subscriptionType);
+ checkAutoCreate(destination, subscriptionType);
+ String subscriptionID = getSubscriptionID(destination, id);
+
+ try {
+ return manager.subscribe(this,
+ subscriptionID,
+ durableSubscriptionName,
+ destination,
+ getSelector(selector, noLocal),
+ Objects.requireNonNullElse(ack,
Stomp.Headers.Subscribe.AckModeValues.AUTO),
+ noLocal,
+ consumerWindowSize);
+ } catch (ActiveMQStompException e) {
+ throw e;
+ } catch (Exception e) {
+ throw BUNDLE.errorCreatingSubscription(subscriptionID,
e).setHandler(frameHandler);
+ }
+ }
+
+ protected void checkAutoCreate(String destination, RoutingType
subscriptionType) throws ActiveMQStompException {
+ AutoCreateResult autoCreateResult;
+ try {
+ RoutingType routingType = getSubscriptionRoutingType(destination,
subscriptionType);
+ autoCreateResult =
getSession().getCoreSession().checkAutoCreate(QueueConfiguration.of(destination).setRoutingType(routingType));
+ } catch (Exception e) {
+ logger.debug("Exception while auto-creating destination", e);
+ throw new ActiveMQStompException(e.getMessage(),
e).setHandler(frameHandler);
+ }
+ if (autoCreateResult == AutoCreateResult.NOT_FOUND) {
+ throw
BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
+ }
+ }
+
+ private RoutingType getSubscriptionRoutingType(String destination,
RoutingType subscriptionType) {
+ if (subscriptionType == null) {
+ return
getManager().getServer().getAddressSettingsRepository().getMatch(destination).getDefaultAddressRoutingType();
+ } else {
+ return subscriptionType;
+ }
+ }
+
+ private String getSelector(String selector, boolean noLocal) {
if (noLocal) {
String noLocalFilter = "(" + CONNECTION_ID_PROPERTY_NAME_STRING + "
<> '" + getID().toString() + "' OR " + CONNECTION_ID_PROPERTY_NAME_STRING + "
IS NULL)";
if (selector == null) {
@@ -562,11 +538,10 @@ public final class StompConnection extends
AbstractRemotingConnection {
selector = "(" + selector + ") AND " + noLocalFilter;
}
}
+ return selector;
+ }
- if (ack == null) {
- ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
- }
-
+ private String getSubscriptionID(String destination, String id) throws
ActiveMQStompException {
String subscriptionID = null;
if (id != null) {
subscriptionID = id;
@@ -576,14 +551,7 @@ public final class StompConnection extends
AbstractRemotingConnection {
}
subscriptionID = "subscription/" + destination;
}
-
- try {
- return manager.subscribe(this, subscriptionID,
durableSubscriptionName, destination, selector, ack, noLocal,
consumerWindowSize);
- } catch (ActiveMQStompException e) {
- throw e;
- } catch (Exception e) {
- throw BUNDLE.errorCreatingSubscription(subscriptionID,
e).setHandler(frameHandler);
- }
+ return subscriptionID;
}
private void validateSelector(String selector) throws
ActiveMQStompException {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 1f09cc9c6b..14790aa049 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -193,9 +193,7 @@ public abstract class VersionedStompFrameHandler {
connection.validate();
String destination = getDestination(frame);
RoutingType routingType =
getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE),
frame.getHeader(Headers.Send.DESTINATION));
- connection.autoCreateDestinationIfPossible(destination, routingType);
- connection.checkDestination(destination);
- connection.checkRoutingSemantics(destination, routingType);
+ connection.checkAutoCreate(destination, routingType);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
long timestamp = System.currentTimeMillis();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
index 05b14477c8..a98f928d9b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
@@ -53,7 +54,7 @@ public class StompLVQTest extends StompTestBase {
public void setUp() throws Exception {
super.setUp();
-
server.createQueue(QueueConfiguration.of(queue).setLastValue(true).setExclusive(true));
+
server.createQueue(QueueConfiguration.of(queue).setLastValue(true).setExclusive(true).setRoutingType(RoutingType.ANYCAST));
producerConn = StompClientConnectionFactory.createClientConnection(uri);
consumerConn = StompClientConnectionFactory.createClientConnection(uri);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index cd190c6fed..15f4425dcd 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -93,6 +93,10 @@ public abstract class StompTestBase extends ActiveMQTestBase
{
protected String defPass = "wombats";
+ protected String onlySendCredential = "onlySend";
+
+ protected String onlyConsumeCredential = "onlyConsume";
+
public StompTestBase(String scheme) {
this.scheme = scheme;
}
@@ -211,6 +215,16 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
final String role = "testRole";
securityManager.getConfiguration().addRole(defUser, role);
config.getSecurityRoles().put("#", new HashSet<Role>(Set.of(new
Role(role, true, true, true, true, true, true, true, true, true, true, false,
false))));
+
+ final String onlySend = onlySendCredential;
+ securityManager.getConfiguration().addUser(onlySend, onlySend);
+ securityManager.getConfiguration().addRole(onlySend, onlySend);
+ config.getSecurityRoles().get("#").add(new Role(onlySend, true,
false, false, false, false, false, false, false, false, false, false, false));
+
+ final String onlyConsume = onlyConsumeCredential;
+ securityManager.getConfiguration().addUser(onlyConsume, onlyConsume);
+ securityManager.getConfiguration().addRole(onlyConsume, onlyConsume);
+ config.getSecurityRoles().get("#").add(new Role(onlyConsume, false,
true, false, false, false, false, false, false, false, false, false, false));
}
return activeMQServer;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithSecurityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithSecurityTest.java
index d7af6ad553..d6369b9717 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithSecurityTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompWithSecurityTest.java
@@ -18,13 +18,18 @@ package org.apache.activemq.artemis.tests.integration.stomp;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
+import java.util.UUID;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -68,4 +73,57 @@ public class StompWithSecurityTest extends StompTestBase {
long tmsg = message.getJMSTimestamp();
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ @Test
+ public void testSendMessageWithDifferentRoutingType() throws Exception {
+ // validate presuppositions
+ SimpleString queueName = SimpleString.of(getQueuePrefix() +
getQueueName());
+ assertNotNull(server.locateQueue(queueName));
+
assertTrue(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.ANYCAST));
+
assertFalse(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.MULTICAST));
+
+ StompClientConnection conn =
StompClientConnectionFactory.createClientConnection(uri);
+ conn.connect(onlySendCredential, onlySendCredential);
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+ frame.addHeader(Stomp.Headers.Send.DESTINATION, queueName.toString());
+ frame.setBody("Hello World");
+ frame.addHeader(Stomp.Headers.Send.DESTINATION_TYPE,
RoutingType.MULTICAST.toString());
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED,
UUID.randomUUID().toString());
+ ClientStompFrame result = conn.sendFrame(frame);
+ assertNotNull(result);
+ assertEquals(Stomp.Responses.ERROR, result.getCommand());
+
assertTrue(result.getHeader(Stomp.Headers.Error.MESSAGE).contains("AMQ229032"));
+
+ conn.disconnect();
+
+
assertTrue(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.ANYCAST));
+
assertFalse(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.MULTICAST));
+ }
+
+ @Test
+ public void testSubscribeWithDifferentRoutingType() throws Exception {
+ // validate presuppositions
+ SimpleString queueName = SimpleString.of(getQueuePrefix() +
getQueueName());
+ assertNotNull(server.locateQueue(queueName));
+
assertTrue(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.ANYCAST));
+
assertFalse(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.MULTICAST));
+
+ StompClientConnection conn =
StompClientConnectionFactory.createClientConnection(uri);
+ conn.connect(onlyConsumeCredential, onlyConsumeCredential);
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE);
+ frame.addHeader(Stomp.Headers.Subscribe.DESTINATION,
queueName.toString());
+ frame.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE,
RoutingType.MULTICAST.toString());
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED,
UUID.randomUUID().toString());
+ ClientStompFrame result = conn.sendFrame(frame);
+ assertNotNull(result);
+ assertEquals(Stomp.Responses.ERROR, result.getCommand());
+
assertTrue(result.getHeader(Stomp.Headers.Error.MESSAGE).contains("AMQ229032"));
+
+ conn.disconnect();
+
+
assertTrue(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.ANYCAST));
+
assertFalse(server.getAddressInfo(queueName).getRoutingTypes().contains(RoutingType.MULTICAST));
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index c8f656f600..23f16eef51 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -399,8 +399,9 @@ public class StompV12Test extends StompTestBase {
String cLen =
String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
-
.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() +
getQueueName())
-
.addHeader(Stomp.Headers.Subscribe.DESTINATION, "aNonexistentQueue")
+ .addHeader(Stomp.Headers.Send.DESTINATION,
getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.Send.DESTINATION,
"aNonexistentQueue")
+
.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.ANYCAST.toString())
.addHeader(Stomp.Headers.CONTENT_TYPE,
"application/xml")
.addHeader(Stomp.Headers.CONTENT_LENGTH,
cLen)
.addHeader("foo", "value1")
@@ -435,8 +436,9 @@ public class StompV12Test extends StompTestBase {
cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
frame = conn.createFrame(Stomp.Commands.SEND)
- .addHeader(Stomp.Headers.Subscribe.DESTINATION,
"aNonexistentQueue")
- .addHeader(Stomp.Headers.Subscribe.DESTINATION,
getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.Send.DESTINATION,
"aNonexistentQueue")
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix()
+ getQueueName())
+ .addHeader(Stomp.Headers.Send.DESTINATION_TYPE,
RoutingType.ANYCAST.toString())
.addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
.addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, "1234")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]