This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new e336b23f74 QPID-8654: [Broker-J] Delayed delivery producers data not 
displayed correctly in REST API (#219)
e336b23f74 is described below

commit e336b23f74533db4fe692a7ff7145193032efee2
Author: Daniil Kirilyuk <daniel.kiril...@gmail.com>
AuthorDate: Thu Aug 24 13:38:50 2023 +0200

    QPID-8654: [Broker-J] Delayed delivery producers data not displayed 
correctly in REST API (#219)
---
 .../org/apache/qpid/server/model/Producer.java     |   9 ++
 .../org/apache/qpid/server/model/ProducerImpl.java |  39 ++++++--
 .../v1_0/StandardReceivingLinkEndpoint.java        |  63 +++++++++++--
 .../v1_0/StandardReceivingLinkEndpointTest.java    | 100 ++++++++++++++++++++-
 4 files changed, 198 insertions(+), 13 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
index 316d124086..f00f8c1bd1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
@@ -25,6 +25,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 @ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer")
 public interface Producer<X extends Producer<X>> extends ConfiguredObject<X>
 {
+    enum DeliveryType { DELAYED_DELIVERY, STANDARD_DELIVERY }
+
     enum DestinationType { EXCHANGE, QUEUE }
 
     void registerMessageDelivered(long messageSize);
@@ -44,9 +46,16 @@ public interface Producer<X extends Producer<X>> extends 
ConfiguredObject<X>
     @DerivedAttribute(description = "Destination name")
     String getDestination();
 
+    void setDestination(String destination);
+
+    @DerivedAttribute(description = "DeliveryType type (standard or delayed)")
+    DeliveryType getDeliveryType();
+
     @DerivedAttribute(description = "Destination type (exchange or queue)")
     DestinationType getDestinationType();
 
+    void setDestinationType(DestinationType destinationType);
+
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.MESSAGES, resettable = true)
     int getMessagesOut();
 
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
index 52e350f272..60b00e33db 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
@@ -46,14 +46,16 @@ public class ProducerImpl<X extends Producer<X>>
 
     private final String _remoteAddress;
 
-    private final String _destination;
-
-    private final DestinationType _destinationType;
+    private final DeliveryType _deliveryType;
 
     private final AtomicInteger _messagesOut = new AtomicInteger();
 
     private final AtomicLong _bytesOut = new AtomicLong();
 
+    private DestinationType _destinationType;
+
+    private String _destination;
+
     public ProducerImpl(final AbstractAMQPSession<?, ?> session,
                         final PublishingLink publishingLink,
                         final MessageDestination messageDestination)
@@ -63,8 +65,17 @@ public class ProducerImpl<X extends Producer<X>>
         _sessionName = session.getName();
         _principal = session.getAMQPConnection().getPrincipal();
         _remoteAddress = session.getAMQPConnection().getRemoteAddress();
-        _destination = messageDestination.getName();
-        _destinationType = messageDestination instanceof Exchange ? 
DestinationType.EXCHANGE : DestinationType.QUEUE;
+        _destination = messageDestination == null ? null : 
messageDestination.getName();
+        if (messageDestination == null)
+        {
+            _deliveryType = DeliveryType.DELAYED_DELIVERY;
+            _destinationType = null;
+        }
+        else
+        {
+            _deliveryType = DeliveryType.STANDARD_DELIVERY;
+            _destinationType = messageDestination instanceof Exchange ? 
DestinationType.EXCHANGE : DestinationType.QUEUE;
+        }
 
         registerWithParents();
         open();
@@ -132,12 +143,30 @@ public class ProducerImpl<X extends Producer<X>>
         return _destination;
     }
 
+    @Override
+    public void setDestination(String destination)
+    {
+        _destination = destination;
+    }
+
     @Override
     public DestinationType getDestinationType()
     {
         return _destinationType;
     }
 
+    @Override
+    public void setDestinationType(DestinationType destinationType)
+    {
+        _destinationType = destinationType;
+    }
+
+    @Override
+    public DeliveryType getDeliveryType()
+    {
+        return _deliveryType;
+    }
+
     @Override
     public int getMessagesOut()
     {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index a17385f0eb..772935067c 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -39,10 +39,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.messages.SenderMessages;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Producer;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
@@ -285,6 +289,24 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
                             if (_producer != null)
                             {
                                 
_producer.registerMessageDelivered(serverMessage.getSizeIncludingHeader());
+                                if 
(Producer.DeliveryType.DELAYED_DELIVERY.equals(_producer.getDeliveryType()))
+                                {
+                                    final String to = serverMessage.getTo();
+                                    if (!Objects.equals(to, 
_producer.getDestination()))
+                                    {
+                                        final MessageDestination 
messageDestination = getAddressSpace()
+                                                
.getAttainedMessageDestination(serverMessage.getTo(), false);
+                                        if (messageDestination != null)
+                                        {
+                                            final Producer.DestinationType 
destinationType =
+                                                    messageDestination 
instanceof Exchange
+                                                            ? 
Producer.DestinationType.EXCHANGE
+                                                            : 
Producer.DestinationType.QUEUE;
+                                            
_producer.setDestinationType(destinationType);
+                                        }
+                                        _producer.setDestination(to);
+                                    }
+                                }
                             }
                         }
                         catch (UnroutableMessageException e)
@@ -497,19 +519,37 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
 
     public void setDestination(final ReceivingDestination receivingDestination)
     {
-        if(_receivingDestination != receivingDestination)
+        if (_receivingDestination != receivingDestination)
         {
-            if (_receivingDestination != null && 
_receivingDestination.getMessageDestination() != null)
+            if (_receivingDestination != null)
             {
                 getSession().removeProducer(_publishingLink);
+                if (_receivingDestination.getMessageDestination() != null)
+                {
+                    
_receivingDestination.getMessageDestination().linkRemoved(_messageSender, 
_publishingLink);
+                }
+                else
+                {
+                    final LogMessage logMessage = 
SenderMessages.CLOSE(_publishingLink.getName(), _producer.getDeliveryType() +
+                            ":" + _producer.getDestinationType() + ":" + 
_producer.getDestination());
+                    getSession().getEventLogger().message(logMessage);
+                }
                 _producer = null;
-                
_receivingDestination.getMessageDestination().linkRemoved(_messageSender, 
_publishingLink);
             }
             _receivingDestination = receivingDestination;
-            if (receivingDestination != null && 
receivingDestination.getMessageDestination() != null)
+            if (receivingDestination != null)
             {
                 _producer = getSession().addProducer(_publishingLink, 
receivingDestination.getMessageDestination());
-                
receivingDestination.getMessageDestination().linkAdded(_messageSender, 
_publishingLink);
+                if (receivingDestination.getMessageDestination() != null)
+                {
+                    
receivingDestination.getMessageDestination().linkAdded(_messageSender, 
_publishingLink);
+                }
+                else
+                {
+                    final String deliveryType = 
String.valueOf(_producer.getDeliveryType());
+                    final LogMessage logMessage = 
SenderMessages.CREATE(_publishingLink.getName(), deliveryType);
+                    getSession().getEventLogger().message(logMessage);
+                }
             }
         }
     }
@@ -518,10 +558,19 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
     public void destroy()
     {
         super.destroy();
-        if (_receivingDestination != null && 
_receivingDestination.getMessageDestination() != null)
+        if (_receivingDestination != null)
         {
             getSession().removeProducer(_publishingLink);
-            
_receivingDestination.getMessageDestination().linkRemoved(_messageSender, 
_publishingLink);
+            if (_receivingDestination.getMessageDestination() != null)
+            {
+                
_receivingDestination.getMessageDestination().linkRemoved(_messageSender, 
_publishingLink);
+            }
+            else
+            {
+                final LogMessage logMessage = 
SenderMessages.CLOSE(_publishingLink.getName(), _producer.getDeliveryType() +
+                        ":" + _producer.getDestinationType() + ":" + 
_producer.getDestination());
+                getSession().getEventLogger().message(logMessage);
+            }
             _receivingDestination = null;
         }
     }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
index 751b13b031..ff7dda767b 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
@@ -20,28 +20,42 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import javax.security.auth.Subject;
+
 import org.junit.jupiter.api.Test;
 
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Producer;
 import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.test.utils.UnitTestBase;
 
 @SuppressWarnings({"unchecked"})
 public class StandardReceivingLinkEndpointTest extends UnitTestBase
 {
     @Test
-    public void linkAddedAndRemoved()
+    public void linkAddedAndRemovedToExchange()
     {
         final SectionDecoderRegistry sectionDecoderRegistry = 
mock(SectionDecoderRegistry.class);
 
@@ -75,4 +89,88 @@ public class StandardReceivingLinkEndpointTest extends 
UnitTestBase
         verify(session).removeProducer(any(PublishingLink.class));
         verify(exchange).linkRemoved(any(MessageSender.class), 
any(PublishingLink.class));
     }
+
+    @Test
+    public void linkAddedAndRemovedToQueue()
+    {
+        final SectionDecoderRegistry sectionDecoderRegistry = 
mock(SectionDecoderRegistry.class);
+
+        final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = 
mock(AMQPDescribedTypeRegistry.class);
+        
doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry();
+
+        final AMQPConnection_1_0<?> connection = 
mock(AMQPConnection_1_0.class);
+        
doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry();
+
+        final Session_1_0 session = mock(Session_1_0.class);
+        doReturn(connection).when(session).getConnection();
+
+        final Link_1_0<Source, Target> link = mock(Link_1_0.class);
+        doReturn("test-link").when(link).getName();
+
+        final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint =
+                new StandardReceivingLinkEndpoint(session, link);
+
+        final Queue<?> queue = mock(Queue.class);
+
+        final ReceivingDestination receivingDestination = 
mock(ReceivingDestination.class);
+        doReturn(queue).when(receivingDestination).getMessageDestination();
+
+        standardReceivingLinkEndpoint.setDestination(receivingDestination);
+
+        verify(session).addProducer(any(PublishingLink.class), eq(queue));
+        verify(queue).linkAdded(any(MessageSender.class), 
any(PublishingLink.class));
+
+        standardReceivingLinkEndpoint.destroy();
+
+        verify(session).removeProducer(any(PublishingLink.class));
+        verify(queue).linkRemoved(any(MessageSender.class), 
any(PublishingLink.class));
+    }
+
+    @Test
+    public void linkAddedAndRemovedAnonymously()
+    {
+        final SectionDecoderRegistry sectionDecoderRegistry = 
mock(SectionDecoderRegistry.class);
+
+        final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = 
mock(AMQPDescribedTypeRegistry.class);
+        
doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry();
+
+        final AMQPConnection_1_0<?> connection = 
mock(AMQPConnection_1_0.class);
+        
doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry();
+        
doReturn(CurrentThreadTaskExecutor.newStartedInstance()).when(connection).getChildExecutor();
+        doReturn(BrokerModel.getInstance()).when(connection).getModel();
+        doReturn(new Subject()).when(connection).getSubject();
+        doReturn(mock(Broker.class)).when(connection).getBroker();
+        
doReturn(mock(NamedAddressSpace.class)).when(connection).getAddressSpace();
+        doReturn(mock(EventLogger.class)).when(connection).getEventLogger();
+        doReturn(0L).when(connection).getContextValue(Long.class, 
Session.PRODUCER_AUTH_CACHE_TIMEOUT);
+        doReturn(0).when(connection).getContextValue(Integer.class, 
Session.PRODUCER_AUTH_CACHE_SIZE);
+
+        final Begin begin = mock(Begin.class);
+        doReturn(new UnsignedInteger(0)).when(begin).getNextOutgoingId();
+        final Session_1_0 session = spy(new Session_1_0(connection, begin, 0, 
0, 1000L));
+
+        final Link_1_0<Source, Target> link = mock(Link_1_0.class);
+        doReturn("test-link").when(link).getName();
+
+        final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint =
+                new StandardReceivingLinkEndpoint(session, link);
+
+        final ReceivingDestination receivingDestination = 
mock(ReceivingDestination.class);
+        doReturn(null).when(receivingDestination).getMessageDestination();
+
+        assertEquals(0, session.getProducerCount());
+        assertEquals(0, session.getChildren(Producer.class).size());
+
+        standardReceivingLinkEndpoint.setDestination(receivingDestination);
+
+        verify(session).addProducer(any(PublishingLink.class), eq(null));
+        assertEquals(1, session.getProducerCount());
+        assertEquals(1, session.getChildren(Producer.class).size());
+
+        standardReceivingLinkEndpoint.destroy();
+
+        verify(session).removeProducer(any(PublishingLink.class));
+        assertEquals(0, session.getProducerCount());
+        assertEquals(0, session.getChildren(Producer.class).size());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to