This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new e336b23f74 QPID-8654: [Broker-J] Delayed delivery producers data not
displayed correctly in REST API (#219)
e336b23f74 is described below
commit e336b23f74533db4fe692a7ff7145193032efee2
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Thu Aug 24 13:38:50 2023 +0200
QPID-8654: [Broker-J] Delayed delivery producers data not displayed
correctly in REST API (#219)
---
.../org/apache/qpid/server/model/Producer.java | 9 ++
.../org/apache/qpid/server/model/ProducerImpl.java | 39 ++++++--
.../v1_0/StandardReceivingLinkEndpoint.java | 63 +++++++++++--
.../v1_0/StandardReceivingLinkEndpointTest.java | 100 ++++++++++++++++++++-
4 files changed, 198 insertions(+), 13 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
index 316d124086..f00f8c1bd1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
@@ -25,6 +25,8 @@ import com.google.common.util.concurrent.ListenableFuture;
@ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer")
public interface Producer<X extends Producer<X>> extends ConfiguredObject<X>
{
+ enum DeliveryType { DELAYED_DELIVERY, STANDARD_DELIVERY }
+
enum DestinationType { EXCHANGE, QUEUE }
void registerMessageDelivered(long messageSize);
@@ -44,9 +46,16 @@ public interface Producer<X extends Producer<X>> extends
ConfiguredObject<X>
@DerivedAttribute(description = "Destination name")
String getDestination();
+ void setDestination(String destination);
+
+ @DerivedAttribute(description = "DeliveryType type (standard or delayed)")
+ DeliveryType getDeliveryType();
+
@DerivedAttribute(description = "Destination type (exchange or queue)")
DestinationType getDestinationType();
+ void setDestinationType(DestinationType destinationType);
+
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.MESSAGES, resettable = true)
int getMessagesOut();
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
index 52e350f272..60b00e33db 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
@@ -46,14 +46,16 @@ public class ProducerImpl<X extends Producer<X>>
private final String _remoteAddress;
- private final String _destination;
-
- private final DestinationType _destinationType;
+ private final DeliveryType _deliveryType;
private final AtomicInteger _messagesOut = new AtomicInteger();
private final AtomicLong _bytesOut = new AtomicLong();
+ private DestinationType _destinationType;
+
+ private String _destination;
+
public ProducerImpl(final AbstractAMQPSession<?, ?> session,
final PublishingLink publishingLink,
final MessageDestination messageDestination)
@@ -63,8 +65,17 @@ public class ProducerImpl<X extends Producer<X>>
_sessionName = session.getName();
_principal = session.getAMQPConnection().getPrincipal();
_remoteAddress = session.getAMQPConnection().getRemoteAddress();
- _destination = messageDestination.getName();
- _destinationType = messageDestination instanceof Exchange ?
DestinationType.EXCHANGE : DestinationType.QUEUE;
+ _destination = messageDestination == null ? null :
messageDestination.getName();
+ if (messageDestination == null)
+ {
+ _deliveryType = DeliveryType.DELAYED_DELIVERY;
+ _destinationType = null;
+ }
+ else
+ {
+ _deliveryType = DeliveryType.STANDARD_DELIVERY;
+ _destinationType = messageDestination instanceof Exchange ?
DestinationType.EXCHANGE : DestinationType.QUEUE;
+ }
registerWithParents();
open();
@@ -132,12 +143,30 @@ public class ProducerImpl<X extends Producer<X>>
return _destination;
}
+ @Override
+ public void setDestination(String destination)
+ {
+ _destination = destination;
+ }
+
@Override
public DestinationType getDestinationType()
{
return _destinationType;
}
+ @Override
+ public void setDestinationType(DestinationType destinationType)
+ {
+ _destinationType = destinationType;
+ }
+
+ @Override
+ public DeliveryType getDeliveryType()
+ {
+ return _deliveryType;
+ }
+
@Override
public int getMessagesOut()
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index a17385f0eb..772935067c 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -39,10 +39,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.messages.SenderMessages;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.DestinationAddress;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Producer;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageFormat;
@@ -285,6 +289,24 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
if (_producer != null)
{
_producer.registerMessageDelivered(serverMessage.getSizeIncludingHeader());
+ if
(Producer.DeliveryType.DELAYED_DELIVERY.equals(_producer.getDeliveryType()))
+ {
+ final String to = serverMessage.getTo();
+ if (!Objects.equals(to,
_producer.getDestination()))
+ {
+ final MessageDestination
messageDestination = getAddressSpace()
+
.getAttainedMessageDestination(serverMessage.getTo(), false);
+ if (messageDestination != null)
+ {
+ final Producer.DestinationType
destinationType =
+ messageDestination
instanceof Exchange
+ ?
Producer.DestinationType.EXCHANGE
+ :
Producer.DestinationType.QUEUE;
+
_producer.setDestinationType(destinationType);
+ }
+ _producer.setDestination(to);
+ }
+ }
}
}
catch (UnroutableMessageException e)
@@ -497,19 +519,37 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
public void setDestination(final ReceivingDestination receivingDestination)
{
- if(_receivingDestination != receivingDestination)
+ if (_receivingDestination != receivingDestination)
{
- if (_receivingDestination != null &&
_receivingDestination.getMessageDestination() != null)
+ if (_receivingDestination != null)
{
getSession().removeProducer(_publishingLink);
+ if (_receivingDestination.getMessageDestination() != null)
+ {
+
_receivingDestination.getMessageDestination().linkRemoved(_messageSender,
_publishingLink);
+ }
+ else
+ {
+ final LogMessage logMessage =
SenderMessages.CLOSE(_publishingLink.getName(), _producer.getDeliveryType() +
+ ":" + _producer.getDestinationType() + ":" +
_producer.getDestination());
+ getSession().getEventLogger().message(logMessage);
+ }
_producer = null;
-
_receivingDestination.getMessageDestination().linkRemoved(_messageSender,
_publishingLink);
}
_receivingDestination = receivingDestination;
- if (receivingDestination != null &&
receivingDestination.getMessageDestination() != null)
+ if (receivingDestination != null)
{
_producer = getSession().addProducer(_publishingLink,
receivingDestination.getMessageDestination());
-
receivingDestination.getMessageDestination().linkAdded(_messageSender,
_publishingLink);
+ if (receivingDestination.getMessageDestination() != null)
+ {
+
receivingDestination.getMessageDestination().linkAdded(_messageSender,
_publishingLink);
+ }
+ else
+ {
+ final String deliveryType =
String.valueOf(_producer.getDeliveryType());
+ final LogMessage logMessage =
SenderMessages.CREATE(_publishingLink.getName(), deliveryType);
+ getSession().getEventLogger().message(logMessage);
+ }
}
}
}
@@ -518,10 +558,19 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
public void destroy()
{
super.destroy();
- if (_receivingDestination != null &&
_receivingDestination.getMessageDestination() != null)
+ if (_receivingDestination != null)
{
getSession().removeProducer(_publishingLink);
-
_receivingDestination.getMessageDestination().linkRemoved(_messageSender,
_publishingLink);
+ if (_receivingDestination.getMessageDestination() != null)
+ {
+
_receivingDestination.getMessageDestination().linkRemoved(_messageSender,
_publishingLink);
+ }
+ else
+ {
+ final LogMessage logMessage =
SenderMessages.CLOSE(_publishingLink.getName(), _producer.getDeliveryType() +
+ ":" + _producer.getDestinationType() + ":" +
_producer.getDestination());
+ getSession().getEventLogger().message(logMessage);
+ }
_receivingDestination = null;
}
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
index 751b13b031..ff7dda767b 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpointTest.java
@@ -20,28 +20,42 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import javax.security.auth.Subject;
+
import org.junit.jupiter.api.Test;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Producer;
import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.test.utils.UnitTestBase;
@SuppressWarnings({"unchecked"})
public class StandardReceivingLinkEndpointTest extends UnitTestBase
{
@Test
- public void linkAddedAndRemoved()
+ public void linkAddedAndRemovedToExchange()
{
final SectionDecoderRegistry sectionDecoderRegistry =
mock(SectionDecoderRegistry.class);
@@ -75,4 +89,88 @@ public class StandardReceivingLinkEndpointTest extends
UnitTestBase
verify(session).removeProducer(any(PublishingLink.class));
verify(exchange).linkRemoved(any(MessageSender.class),
any(PublishingLink.class));
}
+
+ @Test
+ public void linkAddedAndRemovedToQueue()
+ {
+ final SectionDecoderRegistry sectionDecoderRegistry =
mock(SectionDecoderRegistry.class);
+
+ final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry =
mock(AMQPDescribedTypeRegistry.class);
+
doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry();
+
+ final AMQPConnection_1_0<?> connection =
mock(AMQPConnection_1_0.class);
+
doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry();
+
+ final Session_1_0 session = mock(Session_1_0.class);
+ doReturn(connection).when(session).getConnection();
+
+ final Link_1_0<Source, Target> link = mock(Link_1_0.class);
+ doReturn("test-link").when(link).getName();
+
+ final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint =
+ new StandardReceivingLinkEndpoint(session, link);
+
+ final Queue<?> queue = mock(Queue.class);
+
+ final ReceivingDestination receivingDestination =
mock(ReceivingDestination.class);
+ doReturn(queue).when(receivingDestination).getMessageDestination();
+
+ standardReceivingLinkEndpoint.setDestination(receivingDestination);
+
+ verify(session).addProducer(any(PublishingLink.class), eq(queue));
+ verify(queue).linkAdded(any(MessageSender.class),
any(PublishingLink.class));
+
+ standardReceivingLinkEndpoint.destroy();
+
+ verify(session).removeProducer(any(PublishingLink.class));
+ verify(queue).linkRemoved(any(MessageSender.class),
any(PublishingLink.class));
+ }
+
+ @Test
+ public void linkAddedAndRemovedAnonymously()
+ {
+ final SectionDecoderRegistry sectionDecoderRegistry =
mock(SectionDecoderRegistry.class);
+
+ final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry =
mock(AMQPDescribedTypeRegistry.class);
+
doReturn(sectionDecoderRegistry).when(amqpDescribedTypeRegistry).getSectionDecoderRegistry();
+
+ final AMQPConnection_1_0<?> connection =
mock(AMQPConnection_1_0.class);
+
doReturn(amqpDescribedTypeRegistry).when(connection).getDescribedTypeRegistry();
+
doReturn(CurrentThreadTaskExecutor.newStartedInstance()).when(connection).getChildExecutor();
+ doReturn(BrokerModel.getInstance()).when(connection).getModel();
+ doReturn(new Subject()).when(connection).getSubject();
+ doReturn(mock(Broker.class)).when(connection).getBroker();
+
doReturn(mock(NamedAddressSpace.class)).when(connection).getAddressSpace();
+ doReturn(mock(EventLogger.class)).when(connection).getEventLogger();
+ doReturn(0L).when(connection).getContextValue(Long.class,
Session.PRODUCER_AUTH_CACHE_TIMEOUT);
+ doReturn(0).when(connection).getContextValue(Integer.class,
Session.PRODUCER_AUTH_CACHE_SIZE);
+
+ final Begin begin = mock(Begin.class);
+ doReturn(new UnsignedInteger(0)).when(begin).getNextOutgoingId();
+ final Session_1_0 session = spy(new Session_1_0(connection, begin, 0,
0, 1000L));
+
+ final Link_1_0<Source, Target> link = mock(Link_1_0.class);
+ doReturn("test-link").when(link).getName();
+
+ final StandardReceivingLinkEndpoint standardReceivingLinkEndpoint =
+ new StandardReceivingLinkEndpoint(session, link);
+
+ final ReceivingDestination receivingDestination =
mock(ReceivingDestination.class);
+ doReturn(null).when(receivingDestination).getMessageDestination();
+
+ assertEquals(0, session.getProducerCount());
+ assertEquals(0, session.getChildren(Producer.class).size());
+
+ standardReceivingLinkEndpoint.setDestination(receivingDestination);
+
+ verify(session).addProducer(any(PublishingLink.class), eq(null));
+ assertEquals(1, session.getProducerCount());
+ assertEquals(1, session.getChildren(Producer.class).size());
+
+ standardReceivingLinkEndpoint.destroy();
+
+ verify(session).removeProducer(any(PublishingLink.class));
+ assertEquals(0, session.getProducerCount());
+ assertEquals(0, session.getChildren(Producer.class).size());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]