This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fb4c68681e ARTEMIS-4095: fix delivering message size accounting
fb4c68681e is described below

commit fb4c68681edde5225980b1acf0935af124d0b766
Author: Artyom Tarasenko <atar4q...@gmail.com>
AuthorDate: Fri Jun 23 17:55:26 2023 +0200

    ARTEMIS-4095: fix delivering message size accounting
    
    Signed-off-by: Artyom Tarasenko <atar4q...@gmail.com>
---
 .../core/protocol/openwire/amq/AMQConsumer.java    |  4 +---
 .../protocol/openwire/amq/AMQConsumerTest.java     | 27 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index a98369f48e..3165b14faf 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -288,9 +288,7 @@ public class AMQConsumer {
             return 0;
          }
 
-         if (session.getConnection().isNoLocal() || session.isInternal()) {
-            //internal session always delivers messages to noLocal advisory 
consumers
-            //so we need to remove this property too.
+         if (session.getConnection().isNoLocal() || (session.isInternal() && 
AdvisorySupport.isAdvisoryTopic(openwireDestination))) {
             message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
          }
          //handleDeliver is performed by an executor (see JBPAPP-6030): any 
AMQConsumer can share the session.wireFormat()
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
index 18dcfc055f..7992c4f040 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumerTest.java
@@ -16,29 +16,55 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.wireformat.WireFormat;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 
 public class AMQConsumerTest {
+   final OpenWireFormatFactory formatFactory = new OpenWireFormatFactory();
+   final WireFormat openWireFormat =  formatFactory.createWireFormat();
+
+   @Test
+   public void testClientId() throws Exception {
+      final String CID_ID = "client-12345-6789012345678-0:-1";
+
+      ActiveMQMessage classicMessage = new ActiveMQMessage();
+      
classicMessage.setProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING, 
CID_ID);
+      Message artemisMessage = 
OpenWireMessageConverter.inbound(classicMessage.getMessage(), openWireFormat, 
null);
+      assertEquals(CID_ID, 
artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING));
+      MessageReference messageReference = new 
MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
+      AMQConsumer amqConsumer = getConsumer(0);
+      amqConsumer.handleDeliver(messageReference, (ICoreMessage) 
artemisMessage);
+      assertEquals(CID_ID, 
artemisMessage.getStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING));
+   }
 
    @Test
    public void testCreditsWithPrefetch() throws Exception {
@@ -69,6 +95,7 @@ public class AMQConsumerTest {
                                               ArgumentMatchers.anyBoolean(), 
ArgumentMatchers.anyBoolean(),
                                               
ArgumentMatchers.nullable(Integer.class))).thenReturn(Mockito.mock(ServerConsumerImpl.class));
       AMQSession session = Mockito.mock(AMQSession.class);
+      Mockito.when(session.isInternal()).thenReturn(true);
       
Mockito.when(session.getConnection()).thenReturn(Mockito.mock(OpenWireConnection.class));
       Mockito.when(session.getCoreServer()).thenReturn(coreServer);
       Mockito.when(session.getCoreSession()).thenReturn(coreSession);

Reply via email to