This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new e336b23f74 QPID-8654: [Broker-J] Delayed delivery producers data not displayed correctly in REST API (#219) e336b23f74 is described below commit e336b23f74533db4fe692a7ff7145193032efee2 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Thu Aug 24 13:38:50 2023 +0200 QPID-8654: [Broker-J] Delayed delivery producers data not displayed correctly in REST API (#219) --- .../org/apache/qpid/server/model/Producer.java | 9 ++ .../org/apache/qpid/server/model/ProducerImpl.java | 39 ++++++-- .../v1_0/StandardReceivingLinkEndpoint.java | 63 +++++++++++-- .../v1_0/StandardReceivingLinkEndpointTest.java | 100 ++++++++++++++++++++- 4 files changed, 198 insertions(+), 13 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java index 316d124086..f00f8c1bd1 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java @@ -25,6 +25,8 @@ import com.google.common.util.concurrent.ListenableFuture; @ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer") public interface Producer<X extends Producer<X>> extends ConfiguredObject<X> { + enum DeliveryType { DELAYED_DELIVERY, STANDARD_DELIVERY } + enum DestinationType { EXCHANGE, QUEUE } void registerMessageDelivered(long messageSize); @@ -44,9 +46,16 @@ public interface Producer<X extends Producer<X>> extends ConfiguredObject<X> @DerivedAttribute(description = "Destination name") String getDestination(); + void setDestination(String destination); + + @DerivedAttribute(description = "DeliveryType type (standard or delayed)") + DeliveryType getDeliveryType(); + @DerivedAttribute(description = "Destination type (exchange or queue)") DestinationType getDestinationType(); + void setDestinationType(DestinationType destinationType); + @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, resettable = true) int getMessagesOut(); diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java index 52e350f272..60b00e33db 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java @@ -46,14 +46,16 @@ public class ProducerImpl<X extends Producer<X>> private final String _remoteAddress; - private final String _destination; - - private final DestinationType _destinationType; + private final DeliveryType _deliveryType; private final AtomicInteger _messagesOut = new AtomicInteger(); private final AtomicLong _bytesOut = new AtomicLong(); + private DestinationType _destinationType; + + private String _destination; + public ProducerImpl(final AbstractAMQPSession<?, ?> session, final PublishingLink publishingLink, final MessageDestination messageDestination) @@ -63,8 +65,17 @@ public class ProducerImpl<X extends Producer<X>> _sessionName = session.getName(); _principal = session.getAMQPConnection().getPrincipal(); _remoteAddress = session.getAMQPConnection().getRemoteAddress(); - _destination = messageDestination.getName(); - _destinationType = messageDestination instanceof Exchange ? DestinationType.EXCHANGE : DestinationType.QUEUE; + _destination = messageDestination == null ? null : messageDestination.getName(); + if (messageDestination == null) + { + _deliveryType = DeliveryType.DELAYED_DELIVERY; + _destinationType = null; + } + else + { + _deliveryType = DeliveryType.STANDARD_DELIVERY; + _destinationType = messageDestination instanceof Exchange ? DestinationType.EXCHANGE : DestinationType.QUEUE; + } registerWithParents(); open(); @@ -132,12 +143,30 @@ public class ProducerImpl<X extends Producer<X>> return _destination; } + @Override + public void setDestination(String destination) + { + _destination = destination; + } + @Override public DestinationType getDestinationType() { return _destinationType; } + @Override + public void setDestinationType(DestinationType destinationType) + { + _destinationType = destinationType; + } + + @Override + public DeliveryType getDeliveryType() + { + return _deliveryType; + } + @Override public int getMessagesOut() { diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index a17385f0eb..772935067c 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -39,10 +39,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.messages.SenderMessages; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSender; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.DestinationAddress; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Producer; import org.apache.qpid.server.model.PublishingLink; import org.apache.qpid.server.plugin.MessageFormat; @@ -285,6 +289,24 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint if (_producer != null) { _producer.registerMessageDelivered(serverMessage.getSizeIncludingHeader()); + if (Producer.DeliveryType.DELAYED_DELIVERY.equals(_producer.getDeliveryType())) + { + final String to = serverMessage.getTo(); + if (!Objects.equals(to, _producer.getDestination())) + { + final MessageDestination messageDestination = getAddressSpace() + .getAttainedMessageDestination(serverMessage.getTo(), false); + if (messageDestination != null) + { + final Producer.DestinationType destinationType = + messageDestination instanceof Exchange + ? Producer.DestinationType.EXCHANGE + : Producer.DestinationType.QUEUE; + _producer.setDestinationType(destinationType); + } + _producer.setDestination(to); + } + } } } catch (UnroutableMessageException e) @@ -497,19 +519,37 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint public void setDestination(final ReceivingDestination receivingDestination) { - if(_receivingDestination != receivingDestination) + if (_receivingDestination != receivingDestination) { - if (_receivingDestination != null && _receivingDestination.getMessageDestination() != null) + if (_receivingDestination != null) { getSession().removeProducer(_publishingLink); + if (_receivingDestination.getMessageDestination() != null) + { + _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink); + } + else + { + final LogMessage logMessage = SenderMessages.CLOSE(_publishingLink.getName(), _producer.getDeliveryType() + + ":" + _producer.getDestinationType() + ":" + _producer.getDestination()); + getSession().getEventLogger().message(logMessage); + } _producer = null; - _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink); } _receivingDestination = receivingDestination; - if (receivingDestination != null && receivingDestination.getMessageDestination() != null) + if (receivingDestination != null) { _producer = getSession().addProducer(_publishingLink, receivingDestination.getMessageDestination()); - receivingDestination.getMessageDestination().linkAdded(_messageSender, _publishingLink); + if (receivingDestination.getMessageDestination() != null) + { + receivingDestination.getMessageDestination().linkAdded(_messageSender, _publishingLink); + } + else + { + final String deliveryType = String.valueOf(_producer.getDeliveryType()); + final LogMessage logMessage = SenderMessages.CREATE(_publishingLink.getName(), deliveryType); + getSession().getEventLogger().message(logMessage); + } } } } @@ -518,10 +558,19 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint public void destroy() { super.destroy(); - if (_receivingDestination != null && _receivingDestination.getMessageDestination() != null) + if (_receivingDestination != null) { getSession().removeProducer(_publishingLink); - _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink); + if (_receivingDestination.getMessageDestination() != null) + { + _receivingDestination.getMessageDestination().linkRemoved(_messageSender, _publishingLink); + } + else + { + final LogMessage logMessage = SenderMessages.CLOSE(_publishingLink.getName(), _producer.getDeliveryType() + + ":" + _producer.getDestinationType() + ":" + _producer.getDestination()); + getSession().getEventLogger().message(logMessage); + } _receivingDestination = null; } } diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java index 751b13b031..ff7dda767b 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java @@ -20,28 +20,42 @@ */ package org.apache.qpid.server.protocol.v1_0; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import javax.security.auth.Subject; + import org.junit.jupiter.api.Test; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageSender; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.model.Producer; import org.apache.qpid.server.model.PublishingLink; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.test.utils.UnitTestBase; @SuppressWarnings({"unchecked"}) public class StandardReceivingLinkEndpointTest extends UnitTestBase { @Test - public void linkAddedAndRemoved() + public void linkAddedAndRemovedToExchange() { final SectionDecoderRegistry sectionDecoderRegistry = mock(SectionDecoderRegistry.class); @@ -75,4 +89,88 @@ public class StandardReceivingLinkEndpointTest extends UnitTestBase verify(session).removeProducer(any(PublishingLink.class)); verify(exchange).linkRemoved(any(MessageSender.class), any(PublishingLink.class)); } + + @Test + public void linkAddedAndRemovedToQueue() + { + final SectionDecoderRegistry sectionDecoderRegistry = mock(SectionDecoderRegistry.class); + + final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = mock(AMQPDescribedTypeRegistry.class); + doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry(); + + final AMQPConnection_1_0<?> connection = mock(AMQPConnection_1_0.class); + doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry(); + + final Session_1_0 session = mock(Session_1_0.class); + doReturn(connection).when(session).getConnection(); + + final Link_1_0<Source, Target> link = mock(Link_1_0.class); + doReturn("test-link").when(link).getName(); + + final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint = + new StandardReceivingLinkEndpoint(session, link); + + final Queue<?> queue = mock(Queue.class); + + final ReceivingDestination receivingDestination = mock(ReceivingDestination.class); + doReturn(queue).when(receivingDestination).getMessageDestination(); + + standardReceivingLinkEndpoint.setDestination(receivingDestination); + + verify(session).addProducer(any(PublishingLink.class), eq(queue)); + verify(queue).linkAdded(any(MessageSender.class), any(PublishingLink.class)); + + standardReceivingLinkEndpoint.destroy(); + + verify(session).removeProducer(any(PublishingLink.class)); + verify(queue).linkRemoved(any(MessageSender.class), any(PublishingLink.class)); + } + + @Test + public void linkAddedAndRemovedAnonymously() + { + final SectionDecoderRegistry sectionDecoderRegistry = mock(SectionDecoderRegistry.class); + + final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = mock(AMQPDescribedTypeRegistry.class); + doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry(); + + final AMQPConnection_1_0<?> connection = mock(AMQPConnection_1_0.class); + doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry(); + doReturn(CurrentThreadTaskExecutor.newStartedInstance()).when(connection).getChildExecutor(); + doReturn(BrokerModel.getInstance()).when(connection).getModel(); + doReturn(new Subject()).when(connection).getSubject(); + doReturn(mock(Broker.class)).when(connection).getBroker(); + doReturn(mock(NamedAddressSpace.class)).when(connection).getAddressSpace(); + doReturn(mock(EventLogger.class)).when(connection).getEventLogger(); + doReturn(0L).when(connection).getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT); + doReturn(0).when(connection).getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE); + + final Begin begin = mock(Begin.class); + doReturn(new UnsignedInteger(0)).when(begin).getNextOutgoingId(); + final Session_1_0 session = spy(new Session_1_0(connection, begin, 0, 0, 1000L)); + + final Link_1_0<Source, Target> link = mock(Link_1_0.class); + doReturn("test-link").when(link).getName(); + + final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint = + new StandardReceivingLinkEndpoint(session, link); + + final ReceivingDestination receivingDestination = mock(ReceivingDestination.class); + doReturn(null).when(receivingDestination).getMessageDestination(); + + assertEquals(0, session.getProducerCount()); + assertEquals(0, session.getChildren(Producer.class).size()); + + standardReceivingLinkEndpoint.setDestination(receivingDestination); + + verify(session).addProducer(any(PublishingLink.class), eq(null)); + assertEquals(1, session.getProducerCount()); + assertEquals(1, session.getChildren(Producer.class).size()); + + standardReceivingLinkEndpoint.destroy(); + + verify(session).removeProducer(any(PublishingLink.class)); + assertEquals(0, session.getProducerCount()); + assertEquals(0, session.getChildren(Producer.class).size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org