This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 65db6c60c9 ARTEMIS-5074 Fix encoding of bytes properties as Binary in 
AMQPMessage
65db6c60c9 is described below

commit 65db6c60c9869f661967dad1b6872c8fd82753de
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Sep 27 12:08:45 2024 -0400

    ARTEMIS-5074 Fix encoding of bytes properties as Binary in AMQPMessage
    
    When a bytes property is added to an AMQPMessage and it is then reencoded it
    will fail without first wrapping the byte array in an AMQP binary as 
required
    by the ApplicationProperties section specification defined type allowances.
---
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  50 +++++--
 .../protocol/amqp/broker/AMQPMessageTest.java      | 135 ++++++++++++++++++
 .../amqp/AmqpReceiverDispositionTest.java          |  78 ++++++++++-
 .../amqp/AmqpSendReceiveInterceptorTest.java       | 152 ++++++++++++++++++++-
 4 files changed, 405 insertions(+), 10 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 52b3d946ae..025e6f5a83 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1538,7 +1538,6 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
       return (Long) 
getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION);
    }
 
-
    // JMS Style property access methods.  These can result in additional 
decode of AMQP message
    // data from Application properties.  Updates to application properties 
puts the message in a
    // dirty state and requires a re-encode of the data to update all buffer 
state data otherwise
@@ -1617,14 +1616,18 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
    private Object getApplicationObjectProperty(String key) {
       Object value = getApplicationPropertiesMap(false).get(key);
       if (value instanceof Number) {
-         // slow path
+         // AMQP Numeric types must be converted to a compatible value.
          if (value instanceof UnsignedInteger ||
-            value instanceof UnsignedByte ||
-            value instanceof UnsignedLong ||
-            value instanceof UnsignedShort) {
+             value instanceof UnsignedByte ||
+             value instanceof UnsignedLong ||
+             value instanceof UnsignedShort) {
             return ((Number) value).longValue();
          }
+      } else if (value instanceof Binary) {
+         // Binary wrappers must be unwrapped into a byte[] form.
+         return getBytesProperty(key);
       }
+
       return value;
    }
 
@@ -1671,7 +1674,25 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
 
    @Override
    public final byte[] getBytesProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (byte[]) getApplicationPropertiesMap(false).get(key);
+      final Object value = getApplicationPropertiesMap(false).get(key);
+
+      if (value instanceof Binary) {
+         final Binary binary = (Binary) value;
+
+         if (binary.getArray() == null) {
+            return null;
+         } else if (binary.getArrayOffset() == 0 && binary.getLength() == 
binary.getArray().length) {
+            return binary.getArray();
+         } else {
+            final byte[] payload = new byte[binary.getLength()];
+
+            System.arraycopy(binary.getArray(), binary.getArrayOffset(), 
payload, 0, binary.getLength());
+
+            return payload;
+         }
+      } else {
+         return (byte[]) value;
+      }
    }
 
    @Override
@@ -1741,7 +1762,14 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
 
    @Override
    public final org.apache.activemq.artemis.api.core.Message 
putBytesProperty(String key, byte[] value) {
-      getApplicationPropertiesMap(true).put(key, value);
+      Binary payload = null;
+
+      if (value != null) {
+         payload = new Binary(value);
+      }
+
+      getApplicationPropertiesMap(true).put(key, payload);
+
       return this;
    }
 
@@ -1835,7 +1863,13 @@ public abstract class AMQPMessage extends 
RefCountMessage implements org.apache.
 
    @Override
    public final org.apache.activemq.artemis.api.core.Message 
putObjectProperty(String key, Object value) throws 
ActiveMQPropertyConversionException {
-      getApplicationPropertiesMap(true).put(key, value);
+      if (value instanceof byte[]) {
+         // Prevent error from proton encoding, byte array must be wrapped in 
a Binary type.
+         putBytesProperty(key, (byte[]) value);
+      } else {
+         getApplicationPropertiesMap(true).put(key, value);
+      }
+
       return this;
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
index 26722de5d2..6f54217304 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java
@@ -2690,8 +2690,143 @@ public class AMQPMessageTest {
       assertEquals("test-ma", 
readMessageAnnotations.getValue().get(Symbol.valueOf("test-ma")));
    }
 
+   @Test
+   public void testPutAndGetOfMessageBytesProperties() {
+      final byte[] empty = new byte[0];
+      final byte[] array = new byte[] {0, 1, 2, 3};
+
+      final AMQPStandardMessage message = new AMQPStandardMessage(0, 
encodedProtonMessage, null);
+
+      assertNull(message.getBytesProperty("test"));
+
+      // Empty byte array
+      message.putBytesProperty("test", empty);
+      final byte[] emptyResult = message.getBytesProperty("test");
+      assertArrayEquals(empty, emptyResult);
+
+      // Populated byte array
+      message.putBytesProperty("test", array);
+      final byte[] arrayResult = message.getBytesProperty("test");
+      assertArrayEquals(array, arrayResult);
+
+      // null value set and get
+      message.putBytesProperty("test", null);
+
+      assertNull(message.getBytesProperty("test"));
+   }
+
+   @Test
+   public void 
testPutAndGetOfMessageBytesPropertiesViaObjectPropertyAccessor() {
+      final byte[] empty = new byte[0];
+      final byte[] array = new byte[] {0, 1, 2, 3};
+
+      final AMQPStandardMessage message = new AMQPStandardMessage(0, 
encodedProtonMessage, null);
+
+      assertNull(message.getObjectProperty("test"));
+
+      // Empty byte array
+      message.putObjectProperty("test", empty);
+      final byte[] emptyResult = (byte[]) message.getObjectProperty("test");
+      assertArrayEquals(empty, emptyResult);
+
+      // Populated byte array
+      message.putObjectProperty("test", array);
+      final byte[] arrayResult = (byte[]) message.getObjectProperty("test");
+      assertArrayEquals(array, arrayResult);
+
+      // null value set and get
+      message.putObjectProperty("test", null);
+
+      assertNull(message.getObjectProperty("test"));
+   }
+
+   @Test
+   public void testGetBytesPropertyHandlesOffsetArrayInBinary() {
+      final byte[] array = new byte[] {0, 1, 2, 3};
+      final byte[] offsetArray = new byte[] {1, 0, 1, 2, 3, 4};
+      final Binary offsetBinary = new Binary(offsetArray, 1, array.length);
+
+      final AMQPMessageTestExtension message = new AMQPMessageTestExtension(0, 
encodedProtonMessage, null);
+
+      assertNull(message.getBytesProperty("test"));
+
+      final ApplicationProperties applicationProperties = 
message.getDecodedApplicationProperties();
+      assertNotNull(applicationProperties.getValue());
+
+      applicationProperties.getValue().put("test", offsetBinary);
+
+      assertTrue(message.containsProperty("test"));
+
+      final byte[] arrayResult = message.getBytesProperty("test");
+
+      assertArrayEquals(array, arrayResult);
+   }
+
+   @Test
+   public void testGetBytesFromObjectPropertyHandlesOffsetArrayInBinary() {
+      final byte[] array = new byte[] {0, 1, 2, 3};
+      final byte[] offsetArray = new byte[] {1, 0, 1, 2, 3, 4};
+      final Binary offsetBinary = new Binary(offsetArray, 1, array.length);
+
+      final AMQPMessageTestExtension message = new AMQPMessageTestExtension(0, 
encodedProtonMessage, null);
+
+      assertNull(message.getObjectProperty("test"));
+
+      final ApplicationProperties applicationProperties = 
message.getDecodedApplicationProperties();
+      assertNotNull(applicationProperties.getValue());
+
+      applicationProperties.getValue().put("test", offsetBinary);
+
+      assertTrue(message.containsProperty("test"));
+
+      final Object result = message.getObjectProperty("test");
+
+      assertTrue(result instanceof byte[]);
+      assertArrayEquals(array, (byte[]) result);
+   }
+
+   @Test
+   public void testBytesPropertySetAndReencode() {
+      final byte[] array = new byte[] {0, 1, 2, 3};
+
+      final AMQPStandardMessage message = new AMQPStandardMessage(0, 
encodedProtonMessage, null);
+
+      assertNull(message.getBytesProperty("test"));
+
+      message.putBytesProperty("test", array);
+      message.reencode();
+
+      assertTrue(message.containsProperty("test"));
+
+      // Decodes the value from the AMQP data encoding and then checks on the 
contents
+      final Binary result = (Binary) 
message.getApplicationProperties().getValue().get("test");
+
+      assertNotNull(result);
+      assertEquals(array.length, result.getLength());
+
+      // Safe copy to account for possible offset array
+      final byte[] copy = new byte[array.length];
+      System.arraycopy(result.getArray(), result.getArrayOffset(), copy, 0, 
result.getLength());
+
+      assertArrayEquals(array, copy);
+   }
+
    //----- Test Support 
------------------------------------------------------//
 
+   // Extension allows public access to decoded application properties that is 
otherwise
+   // not accessible directly.
+   private class AMQPMessageTestExtension extends AMQPStandardMessage {
+
+      AMQPMessageTestExtension(long messageFormat, byte[] data, 
TypedProperties extraProperties) {
+         super(messageFormat, data, extraProperties, null);
+      }
+
+      @Override
+      public ApplicationProperties getDecodedApplicationProperties() {
+         return applicationProperties;
+      }
+   }
+
    private MessageImpl createProtonMessage() {
       MessageImpl message = (MessageImpl) Proton.message();
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index 5a04044e22..33c0109bc2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -19,13 +19,18 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.proton.message.Message;
 import org.junit.jupiter.api.Test;
@@ -40,6 +45,13 @@ import org.junit.jupiter.api.Timeout;
 
 public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
 
+   private final int MIN_LARGE_MESSAGE_SIZE = 2048;
+
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
+   }
+
    @Test
    @Timeout(30)
    public void testReleasedDisposition() throws Exception {
@@ -187,4 +199,68 @@ public class AmqpReceiverDispositionTest extends 
AmqpClientTestSupport {
 
       connection.close();
    }
+
+   @Test
+   @Timeout(30)
+   public void testReplayMessageForFQQNRejectedMessage() throws Exception {
+      doTestReplayMessageForFQQNRejectedMessage(10);
+   }
+
+   @Test
+   @Timeout(30)
+   public void testReplayMessageForFQQNRejectedLargeMessage() throws Exception 
{
+      doTestReplayMessageForFQQNRejectedMessage(MIN_LARGE_MESSAGE_SIZE);
+   }
+
+   public void doTestReplayMessageForFQQNRejectedMessage(int payloadSize) 
throws Exception {
+      server.createQueue(QueueConfiguration.of("A1").setAddress("A")
+                                                    
.setRoutingType(RoutingType.MULTICAST)
+                                                    .setDurable(true));
+
+      final String targetFQQN = "A::A1";
+      final AmqpClient client = createAmqpClient();
+      final AmqpConnection connection = addConnection(client.connect());
+      final AmqpSession session = connection.createSession();
+      final AmqpSender sender = session.createSender(targetFQQN);
+      final AmqpMessage message = new AmqpMessage();
+
+      final String payload = "#".repeat(payloadSize);
+
+      message.setMessageId("MSG:1");
+      message.setText("Test-Message: " + payload);
+      message.setDurable(true);
+
+      sender.send(message);
+
+      final AmqpReceiver receiver = session.createReceiver(targetFQQN);
+      receiver.flow(1);
+
+      final AmqpMessage rejected = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(rejected, "Did not receive message that we want to 
reject");
+
+      rejected.reject();
+
+      final Queue queueView = server.locateQueue(targetFQQN);
+      final Queue dlqView = server.locateQueue("ActiveMQ.DLQ");
+
+      Wait.assertEquals(0L, () -> queueView.getMessageCount(), 5000, 100);
+      Wait.assertEquals(1L, () -> dlqView.getMessageCount(), 5000, 100);
+
+      // This call with the message sent to an FQQN results in a new 
application
+      // property being added whose payload is a byte array
+
+      dlqView.retryMessages(null); // No filter so all should match
+
+      Wait.assertEquals(0L, () -> dlqView.getMessageCount(), 5000, 100);
+      Wait.assertEquals(1L, () -> queueView.getMessageCount(), 5000, 100);
+
+      receiver.flow(2);
+
+      final AmqpMessage retriedMessage = receiver.receive(5, TimeUnit.SECONDS);
+
+      assertNotNull(retriedMessage);
+      assertEquals("MSG:1", retriedMessage.getMessageId());
+
+      connection.close();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
index 04563d00d6..31c1fde12f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveInterceptorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,15 +33,18 @@ import 
org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Test basic send and receive scenarios using only AMQP sender and receiver 
links.
@@ -166,7 +170,6 @@ public class AmqpSendReceiveInterceptorTest extends 
AmqpClientTestSupport {
    private static final String REPLY_TO = "replyTo";
    private static final String TIME_TO_LIVE = "timeToLive";
 
-
    private boolean checkMessageProperties(AMQPMessage message, Map<String, 
Object> expectedProperties) {
       assertNotNull(message);
       assertNotNull(server.getNodeID());
@@ -301,4 +304,151 @@ public class AmqpSendReceiveInterceptorTest extends 
AmqpClientTestSupport {
       receiver.close();
       connection.close();
    }
+
+   @Test
+   @Timeout(60)
+   public void testReencodeMessageWithByteArrayPropertyAddedOnIngress() throws 
Exception {
+      final CountDownLatch arrived = new CountDownLatch(1);
+      final CountDownLatch departed = new CountDownLatch(1);
+      final AtomicBoolean propertyAddedOnReceive = new AtomicBoolean(false);
+      final AtomicBoolean propertyFoundOnDispatch = new AtomicBoolean(false);
+
+      final String BYTE_PROPERTY_KEY = "byte-property";
+      final byte[] BYTE_PROPERTY_VALUE = 
"test".getBytes(StandardCharsets.UTF_8);
+
+      server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() 
{
+
+         @Override
+         public boolean intercept(AMQPMessage message, RemotingConnection 
connection) throws ActiveMQException {
+            message.putBytesProperty(BYTE_PROPERTY_KEY, BYTE_PROPERTY_VALUE);
+
+            try {
+               message.reencode();
+               propertyAddedOnReceive.set(true);
+            } catch (Exception ex) {
+               return false; // Reject message if updated encode fails, test 
will fail
+            } finally {
+               arrived.countDown();
+            }
+
+            return true;
+         }
+      });
+
+      final AmqpClient client = createAmqpClient();
+      final AmqpConnection connection = addConnection(client.connect());
+      final AmqpSession session = connection.createSession();
+      final AmqpSender sender = session.createSender(getTestName());
+      final AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("MSG:1");
+      message.setText("Test-Message");
+
+      sender.send(message);
+
+      assertTrue(arrived.await(2, TimeUnit.SECONDS));
+      assertTrue(propertyAddedOnReceive.get());
+
+      server.getRemotingService().addOutgoingInterceptor(new AmqpInterceptor() 
{
+
+         @Override
+         public boolean intercept(AMQPMessage packet, RemotingConnection 
connection) throws ActiveMQException {
+            if (packet.containsProperty(BYTE_PROPERTY_KEY)) {
+               propertyFoundOnDispatch.set(true);
+            }
+
+            departed.countDown();
+            return true;
+         }
+      });
+
+      final AmqpReceiver receiver = session.createReceiver(getTestName());
+      receiver.flow(2);
+
+      final AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(amqpMessage);
+      assertEquals(departed.getCount(), 0);
+      assertTrue(propertyFoundOnDispatch.get());
+      assertNotNull(amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY));
+      assertTrue(amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY) 
instanceof Binary);
+
+      final Binary binary = (Binary) 
amqpMessage.getApplicationProperty(BYTE_PROPERTY_KEY);
+      assertEquals(BYTE_PROPERTY_VALUE.length, binary.getLength());
+
+      // Make a safe copy in case binary payload is not in exact sized array.
+      final byte[] copy = new byte[BYTE_PROPERTY_VALUE.length];
+      System.arraycopy(binary.getArray(), binary.getArrayOffset(), copy, 0, 
binary.getLength());
+
+      assertArrayEquals(BYTE_PROPERTY_VALUE, copy);
+
+      sender.close();
+      receiver.close();
+      connection.close();
+   }
+
+   @Test
+   @Timeout(60)
+   public void testGetBytesPropertyReturnsByteArray() throws Exception {
+      
doTestGetBytesPropertyReturnsByteArray("array-payload".getBytes(StandardCharsets.UTF_8));
+   }
+
+   @Test
+   @Timeout(60)
+   public void testGetBytesPropertyReturnsEmptyByteArray() throws Exception {
+      doTestGetBytesPropertyReturnsByteArray(new byte[0]);
+   }
+
+   public void doTestGetBytesPropertyReturnsByteArray(byte[] array) throws 
Exception {
+
+      final CountDownLatch arrived = new CountDownLatch(1);
+      final AtomicBoolean bytesReturnedFromBrokerMessage = new 
AtomicBoolean(false);
+
+      final String BYTE_PROPERTY_KEY = "byte-property";
+
+      server.getRemotingService().addIncomingInterceptor(new AmqpInterceptor() 
{
+
+         @Override
+         public boolean intercept(AMQPMessage message, RemotingConnection 
connection) throws ActiveMQException {
+            try {
+               final Object appProperty = 
message.getApplicationProperties().getValue().get(BYTE_PROPERTY_KEY);
+
+               // The application property should return the encoded Binary 
value
+               assertNotNull(appProperty);
+               assertTrue(appProperty instanceof Binary);
+
+               final byte[] payload = 
message.getBytesProperty(BYTE_PROPERTY_KEY);
+
+               // The getBytesProperty API should unwrap and return the byte 
array
+               assertEquals(array.length, payload.length);
+               assertArrayEquals(array, payload);
+
+               bytesReturnedFromBrokerMessage.set(true);
+            } catch (Exception ex) {
+               return false; // Reject message if updated encode fails, test 
will fail
+            } finally {
+               arrived.countDown();
+            }
+
+            return true;
+         }
+      });
+
+      final AmqpClient client = createAmqpClient();
+      final AmqpConnection connection = addConnection(client.connect());
+      final AmqpSession session = connection.createSession();
+      final AmqpSender sender = session.createSender(getTestName());
+      final AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("MSG:1");
+      message.setText("Test-Message");
+      message.setApplicationProperty(BYTE_PROPERTY_KEY, new Binary(array));
+
+      sender.send(message);
+
+      assertTrue(arrived.await(2, TimeUnit.SECONDS));
+      assertTrue(bytesReturnedFromBrokerMessage.get());
+
+      sender.close();
+      connection.close();
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to