This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e9bac0654 QPID-8602: [Broker-J] Show producer details in REST API 
(#142)
4e9bac0654 is described below

commit 4e9bac0654125286de8653ff1b6bddb70264489b
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Tue Oct 11 16:00:36 2022 +0200

    QPID-8602: [Broker-J] Show producer details in REST API (#142)
---
 .../qpid/server/exchange/AbstractExchange.java     | 172 ++++++++++++---------
 .../org/apache/qpid/server/model/BrokerModel.java  |   5 +
 .../org/apache/qpid/server/model/Exchange.java     |   3 +
 .../org/apache/qpid/server/model/Producer.java     |  57 +++++++
 .../org/apache/qpid/server/model/ProducerImpl.java | 152 ++++++++++++++++++
 .../java/org/apache/qpid/server/model/Queue.java   |   5 +-
 .../java/org/apache/qpid/server/model/Session.java |   5 +
 .../apache/qpid/server/queue/AbstractQueue.java    |  31 ++--
 .../qpid/server/session/AbstractAMQPSession.java   |  68 ++++++++
 .../apache/qpid/server/exchange/ProducerTest.java  | 135 ++++++++++++++++
 .../v1_0/StandardReceivingLinkEndpoint.java        |  19 ++-
 .../Java-Broker-Management-Managing-Entities.xml   |   1 +
 .../Java-Broker-Management-Managing-Producers.xml  |  29 ++++
 13 files changed, 590 insertions(+), 92 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 
b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 17f62f91fe..32a7ce1502 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -107,7 +107,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     private static final Operation PUBLISH_ACTION = 
Operation.PERFORM_ACTION("publish");
     private final AtomicBoolean _closed = new AtomicBoolean();
 
-    @ManagedAttributeField(beforeSet = "preSetAlternateBinding", afterSet = 
"postSetAlternateBinding" )
+    @ManagedAttributeField(beforeSet = "preSetAlternateBinding", afterSet = 
"postSetAlternateBinding")
     private AlternateBinding _alternateBinding;
     @ManagedAttributeField
     private UnroutableMessageBehaviour _unroutableMessageBehaviour;
@@ -123,7 +123,8 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
     //The logSubject for ths exchange
     private LogSubject _logSubject;
-    private final Set<DestinationReferrer> _referrers = 
Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+    private final Set<DestinationReferrer> _referrers =
+            Collections.newSetFromMap(new 
ConcurrentHashMap<DestinationReferrer, Boolean>());
 
     private final AtomicLong _receivedMessageCount = new AtomicLong();
     private final AtomicLong _receivedMessageSize = new AtomicLong();
@@ -131,6 +132,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     private final AtomicLong _routedMessageSize = new AtomicLong();
     private final AtomicLong _droppedMessageCount = new AtomicLong();
     private final AtomicLong _droppedMessageSize = new AtomicLong();
+    private final AtomicLong _producerCount = new AtomicLong();
 
     private final List<Binding> _bindings = new CopyOnWriteArrayList<>();
 
@@ -143,7 +145,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
         super(vhost, attributes);
         Set<String> providedAttributeNames = new 
HashSet<>(attributes.keySet());
         providedAttributeNames.removeAll(getAttributeNames());
-        if(!providedAttributeNames.isEmpty())
+        if (!providedAttributeNames.isEmpty())
         {
             throw new IllegalArgumentException("Unknown attributes provided: " 
+ providedAttributeNames);
         }
@@ -157,7 +159,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     {
         super.onValidate();
 
-        if(!isSystemProcess())
+        if (!isSystemProcess())
         {
             if (isReservedExchangeName(getName()))
             {
@@ -173,11 +175,11 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
         validateOrCreateAlternateBinding(((Exchange<?>) proxyForValidation), 
false);
 
-        if (changedAttributes.contains(ConfiguredObject.DESIRED_STATE) && 
proxyForValidation.getDesiredState() == State.DELETED)
+        if (changedAttributes.contains(ConfiguredObject.DESIRED_STATE) &&
+            proxyForValidation.getDesiredState() == State.DELETED)
         {
             doChecks();
         }
-
     }
 
     private boolean isReservedExchangeName(String name)
@@ -192,7 +194,8 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
         super.validateOnCreate();
         if (getCreatingLinkInfo() != null && !isSystemProcess())
         {
-            throw new IllegalConfigurationException(String.format("Cannot 
specify creatingLinkInfo for exchange '%s'", getName()));
+            throw new IllegalConfigurationException(String.format("Cannot 
specify creatingLinkInfo for exchange '%s'",
+                                                                  getName()));
         }
     }
 
@@ -208,7 +211,10 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     {
         super.onOpen();
         final ConfiguredDerivedMethodAttribute<Exchange<?>, 
Collection<Binding>> durableBindingsAttribute =
-                (ConfiguredDerivedMethodAttribute<Exchange<?>, 
Collection<Binding>>) 
getModel().getTypeRegistry().getAttributeTypes(getTypeClass()).get(DURABLE_BINDINGS);
+                (ConfiguredDerivedMethodAttribute<Exchange<?>, 
Collection<Binding>>) getModel().getTypeRegistry()
+                                                                               
                .getAttributeTypes(
+                                                                               
                        getTypeClass())
+                                                                               
                .get(DURABLE_BINDINGS);
         final Collection<Binding> bindings =
                 
durableBindingsAttribute.convertValue(getActualAttributes().get(DURABLE_BINDINGS),
 this);
         if (bindings != null)
@@ -219,7 +225,8 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
                 final MessageDestination messageDestination = 
getOpenedMessageDestination(b.getDestination());
                 if (messageDestination != null)
                 {
-                    Map<String, Object> arguments = b.getArguments() == null ? 
Collections.emptyMap() : b.getArguments();
+                    Map<String, Object> arguments =
+                            b.getArguments() == null ? Collections.emptyMap() 
: b.getArguments();
                     try
                     {
                         onBind(new BindingIdentifier(b.getBindingKey(), 
messageDestination), arguments);
@@ -240,11 +247,13 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
                 final LinkModel link;
                 if (_creatingLinkInfo.isSendingLink())
                 {
-                    link = 
_virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(), 
_creatingLinkInfo.getLinkName());
+                    link = 
_virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(),
+                                                       
_creatingLinkInfo.getLinkName());
                 }
                 else
                 {
-                    link = 
_virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(), 
_creatingLinkInfo.getLinkName());
+                    link = 
_virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(),
+                                                         
_creatingLinkInfo.getLinkName());
                 }
                 addLifetimeConstraint(link);
             }
@@ -266,7 +275,9 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             }
             else
             {
-                LOGGER.warn("Cannot find alternate binding destination '{}' 
for exchange '{}'", alternateDestination, toString());
+                LOGGER.warn("Cannot find alternate binding destination '{}' 
for exchange '{}'",
+                            alternateDestination,
+                            toString());
             }
         }
     }
@@ -308,19 +319,19 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
     private void performDelete()
     {
-        if(_closed.compareAndSet(false,true))
+        if (_closed.compareAndSet(false, true))
         {
             performDeleteTasks();
 
-            for(Binding b : _bindings)
+            for (Binding b : _bindings)
             {
                 final MessageDestination messageDestination = 
getAttainedMessageDestination(b.getDestination());
-                if(messageDestination != null)
+                if (messageDestination != null)
                 {
                     messageDestination.linkRemoved(this, b);
                 }
             }
-            for(MessageSender sender : _linkedSenders.keySet())
+            for (MessageSender sender : _linkedSenders.keySet())
             {
                 sender.destinationRemoved(this);
             }
@@ -334,12 +345,12 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
     private void doChecks()
     {
-        if(hasReferrers())
+        if (hasReferrers())
         {
             throw new MessageDestinationIsAlternateException(getName());
         }
 
-        if(isReservedExchangeName(getName()))
+        if (isReservedExchangeName(getName()))
         {
             throw new RequiredExchangeException(getName());
         }
@@ -347,13 +358,13 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
     @Override
     @DoOnConfigThread
-    public void destinationRemoved(@Param(name="destination") final 
MessageDestination destination)
+    public void destinationRemoved(@Param(name = "destination") final 
MessageDestination destination)
     {
         Iterator<Binding> bindingIterator = _bindings.iterator();
-        while(bindingIterator.hasNext())
+        while (bindingIterator.hasNext())
         {
             Binding b = bindingIterator.next();
-            if(b.getDestination().equals(destination.getName()))
+            if (b.getDestination().equals(destination.getName()))
             {
                 final Map<String, Object> bindArguments =
                         UNBIND_ARGUMENTS_CREATOR.createMap(b.getBindingKey(), 
destination);
@@ -362,7 +373,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
                 _bindings.remove(b);
             }
         }
-        if(!autoDeleteIfNecessary())
+        if (!autoDeleteIfNecessary())
         {
             if (destination.isDurable() && isDurable())
             {
@@ -381,7 +392,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @Override
     public String toString()
     {
-        return getClass().getSimpleName() + "[" + getName() +"]";
+        return getClass().getSimpleName() + "[" + getName() + "]";
     }
 
     @Override
@@ -391,19 +402,19 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     }
 
     @Override
-    public boolean isBound(String bindingKey, Map<String,Object> arguments, 
Queue<?> queue)
+    public boolean isBound(String bindingKey, Map<String, Object> arguments, 
Queue<?> queue)
     {
         if (bindingKey == null)
         {
             bindingKey = "";
         }
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(bindingKey.equals(b.getBindingKey()) && 
queue.getName().equals(b.getDestination()))
+            if (bindingKey.equals(b.getBindingKey()) && 
queue.getName().equals(b.getDestination()))
             {
                 return (b.getArguments() == null || b.getArguments().isEmpty())
-                       ? (arguments == null || arguments.isEmpty())
-                       : b.getArguments().equals(arguments);
+                        ? (arguments == null || arguments.isEmpty())
+                        : b.getArguments().equals(arguments);
             }
         }
         return false;
@@ -417,9 +428,9 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             bindingKey = "";
         }
 
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(bindingKey.equals(b.getBindingKey()) && 
queue.getName().equals(b.getDestination()))
+            if (bindingKey.equals(b.getBindingKey()) && 
queue.getName().equals(b.getDestination()))
             {
                 return true;
             }
@@ -435,9 +446,9 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             bindingKey = "";
         }
 
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(bindingKey.equals(b.getBindingKey()))
+            if (bindingKey.equals(b.getBindingKey()))
             {
                 return true;
             }
@@ -448,9 +459,9 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @Override
     public boolean isBound(Queue<?> queue)
     {
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(queue.getName().equals(b.getDestination()))
+            if (queue.getName().equals(b.getDestination()))
             {
                 return true;
             }
@@ -461,12 +472,12 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @Override
     public boolean isBound(Map<String, Object> arguments, Queue<?> queue)
     {
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(queue.getName().equals(b.getDestination()) &&
-               ((b.getArguments() == null || b.getArguments().isEmpty())
-                       ? (arguments == null || arguments.isEmpty())
-                       : b.getArguments().equals(arguments)))
+            if (queue.getName().equals(b.getDestination()) &&
+                ((b.getArguments() == null || b.getArguments().isEmpty())
+                        ? (arguments == null || arguments.isEmpty())
+                        : b.getArguments().equals(arguments)))
             {
                 return true;
             }
@@ -477,11 +488,11 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @Override
     public boolean isBound(Map<String, Object> arguments)
     {
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(((b.getArguments() == null || b.getArguments().isEmpty())
-                                   ? (arguments == null || arguments.isEmpty())
-                                   : b.getArguments().equals(arguments)))
+            if (((b.getArguments() == null || b.getArguments().isEmpty())
+                    ? (arguments == null || arguments.isEmpty())
+                    : b.getArguments().equals(arguments)))
             {
                 return true;
             }
@@ -498,12 +509,12 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             bindingKey = "";
         }
 
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(b.getBindingKey().equals(bindingKey) &&
-               ((b.getArguments() == null || b.getArguments().isEmpty())
-                       ? (arguments == null || arguments.isEmpty())
-                       : b.getArguments().equals(arguments)))
+            if (b.getBindingKey().equals(bindingKey) &&
+                ((b.getArguments() == null || b.getArguments().isEmpty())
+                        ? (arguments == null || arguments.isEmpty())
+                        : b.getArguments().equals(arguments)))
             {
                 return true;
             }
@@ -534,7 +545,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @SuppressWarnings("unused")
     private void postSetAlternateBinding()
     {
-        if(_alternateBinding != null)
+        if (_alternateBinding != null)
         {
             _alternateBindingDestination = 
getOpenedMessageDestination(_alternateBinding.getDestination());
             if (_alternateBindingDestination != null)
@@ -587,6 +598,11 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
         return getBindings().size();
     }
 
+    @Override
+    public long getProducerCount()
+    {
+        return _producerCount.get();
+    }
 
     @Override
     public <M extends ServerMessage<? extends StorableMessageMetaData>> 
RoutingResult<M> route(final M message,
@@ -655,7 +671,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
         }
         finally
         {
-            if(topLevel)
+            if (topLevel)
             {
                 CURRENT_ROUTING.set(null);
             }
@@ -664,9 +680,9 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
 
     protected abstract <M extends ServerMessage<? extends 
StorableMessageMetaData>> void doRoute(final M message,
-                                    final String routingAddress,
-                                    final InstanceProperties 
instanceProperties,
-                                    final RoutingResult<M> result);
+                                                                               
                  final String routingAddress,
+                                                                               
                  final InstanceProperties instanceProperties,
+                                                                               
                  final RoutingResult<M> result);
 
     @Override
     public boolean bind(final String destination,
@@ -695,7 +711,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             throw new IllegalArgumentException(String.format("Destination '%s' 
is not found.", destination));
         }
 
-        if(arguments == null)
+        if (arguments == null)
         {
             arguments = Collections.emptyMap();
         }
@@ -703,7 +719,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
         Binding newBinding = new BindingImpl(bindingKey, destination, 
arguments);
 
         Binding previousBinding = null;
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
             if (b.getBindingKey().equals(bindingKey) && 
b.getDestination().equals(messageDestination.getName()))
             {
@@ -717,9 +733,8 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             return false;
         }
 
-
         final BindingIdentifier bindingIdentifier = new 
BindingIdentifier(bindingKey, messageDestination);
-        if(previousBinding != null)
+        if (previousBinding != null)
         {
             onBindingUpdated(bindingIdentifier, arguments);
         }
@@ -738,7 +753,7 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             _bindings.remove(previousBinding);
         }
         _bindings.add(newBinding);
-        if(isDurable() && messageDestination.isDurable())
+        if (isDurable() && messageDestination.isDurable())
         {
             final Collection<Binding> durableBindings = getDurableBindings();
             attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
@@ -751,9 +766,9 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     {
         List<Binding> bindings = new ArrayList<>();
         final String destinationName = destination.getName();
-        for(Binding b : _bindings)
+        for (Binding b : _bindings)
         {
-            if(b.getDestination().equals(destinationName))
+            if (b.getDestination().equals(destinationName))
             {
                 bindings.add(b);
             }
@@ -765,13 +780,13 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     public Collection<Binding> getDurableBindings()
     {
         List<Binding> durableBindings;
-        if(isDurable())
+        if (isDurable())
         {
             durableBindings = new ArrayList<>();
             for (Binding b : _bindings)
             {
                 MessageDestination destination = 
getAttainedMessageDestination(b.getDestination());
-                if(destination != null && destination.isDurable())
+                if (destination != null && destination.isDurable())
                 {
                     durableBindings.add(b);
                 }
@@ -844,7 +859,6 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
             }
         }
         return false;
-
     }
 
     @Override
@@ -913,13 +927,14 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
 
     private boolean isAutoDeletePending()
     {
-        return (getLifetimePolicy() == 
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == 
LifetimePolicy.DELETE_ON_NO_LINKS )
-            && getBindingCount() == 0;
+        return (getLifetimePolicy() == 
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS ||
+                getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS) &&
+                getBindingCount() == 0;
     }
 
 
     @SuppressWarnings("unused")
-    @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, 
desiredState = State.ACTIVE)
+    @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
     private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
@@ -1030,13 +1045,14 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @Override
     public void linkAdded(final MessageSender sender, final PublishingLink 
link)
     {
-        Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
-        if(oldValue != null)
+        final Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
+        if (oldValue != null)
         {
-            _linkedSenders.put(sender, oldValue+1);
+            _linkedSenders.put(sender, oldValue + 1);
         }
-        if( link.TYPE_LINK.equals(link.getType()))
+        if (link.TYPE_LINK.equals(link.getType()))
         {
+            _producerCount.incrementAndGet();
             getEventLogger().message(SenderMessages.CREATE(link.getName(), 
link.getDestination()));
         }
     }
@@ -1044,17 +1060,25 @@ public abstract class AbstractExchange<T extends 
AbstractExchange<T>>
     @Override
     public void linkRemoved(final MessageSender sender, final PublishingLink 
link)
     {
-        int oldValue = _linkedSenders.remove(sender);
-        if(oldValue != 1)
+        final int oldValue = _linkedSenders.remove(sender);
+        if (oldValue != 1)
         {
-            _linkedSenders.put(sender, oldValue-1);
+            _linkedSenders.put(sender, oldValue - 1);
         }
-        if( link.TYPE_LINK.equals(link.getType()))
+        if (link.TYPE_LINK.equals(link.getType()))
         {
+            _producerCount.decrementAndGet();
             getEventLogger().message(SenderMessages.CLOSE(link.getName(), 
link.getDestination()));
         }
     }
 
+    @Override
+    public void close()
+    {
+        _producerCount.set(0);
+        super.close();
+    }
+
     private void validateOrCreateAlternateBinding(final Exchange<?> exchange, 
final boolean mayCreate)
     {
         Object value = exchange.getAttribute(ALTERNATE_BINDING);
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
index 89532975e6..82225174e6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
@@ -80,6 +80,9 @@ public final class BrokerModel extends Model
      * 8.0
      *    Added new broker statistics: processCpuTime, processCpuLoad
      *    Added new context variables for queues and exchanges to configure 
behaviour on unknown declared arguments
+     *
+     * 9.0
+     *    Introduced PublishProducer as a child of Exchange and 
PointToPointProducer as child of Queue
      */
     public static final int MODEL_MAJOR_VERSION = 9;
     public static final int MODEL_MINOR_VERSION = 0;
@@ -138,6 +141,8 @@ public final class BrokerModel extends Model
 
         addRelationship(Connection.class, Session.class);
 
+        addRelationship(Session.class, Producer.class);
+
         addRelationship(Queue.class, Consumer.class);
 
         _objectFactory = new ConfiguredObjectFactoryImpl(this);
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 5e2d68f198..f95c6a3948 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -111,6 +111,9 @@ public interface Exchange<X extends Exchange<X>> extends 
ConfiguredObject<X>, Me
                       description = "Number of messages received by this 
exchange.", metricName = "inbound_messages_count")
     long getMessagesIn();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.COUNT, label = "Producers",
+            description = "Number of producers to this exchange.", metricName 
= "producers_count")
+    long getProducerCount();
 
     @ManagedOperation(changesConfiguredObjectState = true,
                       description = "Bind a given destination to exchange 
using a given bindingKey and arguments."
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
new file mode 100644
index 0000000000..7235e6a14f
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+@ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer")
+public interface Producer<X extends Producer<X>> extends ConfiguredObject<X>
+{
+    enum DestinationType { EXCHANGE, QUEUE }
+
+    void registerMessageDelivered(long messageSize);
+
+    @DerivedAttribute(description = "Session ID")
+    String getSessionId();
+
+    @DerivedAttribute(description = "Session name")
+    String getSessionName();
+
+    @DerivedAttribute(description = "Connection principal")
+    String getPrincipal();
+
+    @DerivedAttribute(description = "Connection remote address")
+    String getRemoteAddress();
+
+    @DerivedAttribute(description = "Destination name")
+    String getDestination();
+
+    @DerivedAttribute(description = "Destination type (exchange or queue)")
+    DestinationType getDestinationType();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.MESSAGES)
+    int getMessagesOut();
+
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.BYTES)
+    long getBytesOut();
+
+    ListenableFuture<Void> deleteNoChecks();
+}
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
new file mode 100644
index 0000000000..431b4e31d5
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.session.AbstractAMQPSession;
+
+// sonar complains about underscores in variable names
+@SuppressWarnings("java:S116")
+public class ProducerImpl<X extends Producer<X>>
+        extends AbstractConfiguredObject<ProducerImpl<X>>
+        implements Producer<ProducerImpl<X>>
+{
+    private final String _sessionId;
+
+    private final String _sessionName;
+
+    private final String _principal;
+
+    private final String _remoteAddress;
+
+    private final String _destination;
+
+    private final DestinationType _destinationType;
+
+    private final AtomicInteger _messagesOut = new AtomicInteger();
+
+    private final AtomicLong _bytesOut = new AtomicLong();
+
+    public ProducerImpl(final AbstractAMQPSession<?, ?> session,
+                        final PublishingLink publishingLink,
+                        final MessageDestination messageDestination)
+    {
+        super(session, createAttributeMap(publishingLink));
+        _sessionId = String.valueOf(session.getId());
+        _sessionName = session.getName();
+        _principal = session.getAMQPConnection().getPrincipal();
+        _remoteAddress = session.getAMQPConnection().getRemoteAddress();
+        _destination = messageDestination.getName();
+        _destinationType = messageDestination instanceof Exchange ? 
DestinationType.EXCHANGE : DestinationType.QUEUE;
+
+        registerWithParents();
+        open();
+    }
+
+    private static Map<String, Object> createAttributeMap(final PublishingLink 
publishingLink)
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        attributes.put(ID, UUID.randomUUID());
+        attributes.put(NAME, publishingLink.getName());
+        attributes.put(DURABLE, false);
+        attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
+        attributes.put(STATE, State.ACTIVE);
+        return attributes;
+    }
+
+    @SuppressWarnings("unused")
+    @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
+    private ListenableFuture<Void> activate()
+    {
+        setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public ListenableFuture<Void> deleteNoChecks()
+    {
+        return super.deleteNoChecks();
+    }
+
+    @Override
+    public void registerMessageDelivered(long messageSize)
+    {
+        _messagesOut.incrementAndGet();
+        _bytesOut.addAndGet(messageSize);
+    }
+
+    @Override
+    public String getSessionId()
+    {
+        return _sessionId;
+    }
+
+    @Override
+    public String getSessionName()
+    {
+        return _sessionName;
+    }
+
+    @Override
+    public String getPrincipal()
+    {
+        return _principal;
+    }
+
+    @Override
+    public String getRemoteAddress()
+    {
+        return _remoteAddress;
+    }
+
+    @Override
+    public String getDestination()
+    {
+        return _destination;
+    }
+
+    @Override
+    public DestinationType getDestinationType()
+    {
+        return _destinationType;
+    }
+
+    @Override
+    public int getMessagesOut()
+    {
+        return _messagesOut.get();
+    }
+
+    @Override
+    public long getBytesOut()
+    {
+        return _bytesOut.get();
+    }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index c0a5f2f4fb..1e1f5db950 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -372,7 +372,6 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = 
false, skipAclCheck = true)
     Collection<PublishingLink> getPublishingLinks();
 
-
     @Override
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = 
false, skipAclCheck = true)
     Collection<QueueConsumer<?,?>> getConsumers();
@@ -519,6 +518,10 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
             description = "Total number of enqueued malformed messages.", 
metricName = "malformed_messages_count")
     long getTotalMalformedMessages();
 
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.COUNT, label = "Producers",
+            description = "Number of producers to this queue.", metricName = 
"producers_count")
+    long getProducerCount();
+
     @ManagedOperation(description = "move messages from this queue to 
another", changesConfiguredObjectState = false)
     List<Long> moveMessages(@Param(name = "destination", description = "The 
queue to which the messages should be moved", mandatory = true) Queue<?> 
destination,
                             @Param(name = "messageIds", description = "If 
provided, only messages in the queue whose (internal) message-id is supplied 
will be considered for moving") List<Long> messageIds,
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 894df6a83c..36d73a817d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -95,4 +95,9 @@ public interface Session<X extends Session<X>> extends 
ConfiguredObject<X>
     @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = 
StatisticUnit.MESSAGES, label = "Transacted Outbound",
             description = "Total number of messages received by this session 
within a transaction.", metricName = "transacted_outbound_messages_count")
     long getTransactedMessagesOut();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.COUNT, label = "Producers",
+            description = "Number of producers to this exchange.", metricName 
= "producers_count")
+    long getProducerCount();
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index a0b6393997..a7471c3b3a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -50,6 +50,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPInputStream;
@@ -81,7 +82,6 @@ import org.apache.qpid.server.filter.SelectorParsingException;
 import org.apache.qpid.server.filter.selector.ParseException;
 import org.apache.qpid.server.filter.selector.TokenMgrError;
 import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.Outcome;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.QueueMessages;
@@ -279,6 +279,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     private final Set<DestinationReferrer> _referrers = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Set<LocalTransaction> _transactions = 
ConcurrentHashMap.newKeySet();
     private final LocalTransaction.LocalTransactionListener 
_localTransactionListener = _transactions::remove;
+    private final AtomicLong _producerCount = new AtomicLong();
 
     private boolean _closing;
     private Map<String, String> _mimeTypeToFileExtension = 
Collections.emptyMap();
@@ -1064,7 +1065,12 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         return super.beforeClose();
     }
 
-
+    @Override
+    public void close()
+    {
+        _producerCount.set(0);
+        super.close();
+    }
 
     <T extends ConsumerTarget<T>> void unregisterConsumer(final 
QueueConsumerImpl<T> consumer)
     {
@@ -1230,6 +1236,12 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         return _bindingCount;
     }
 
+    @Override
+    public long getProducerCount()
+    {
+        return _producerCount.get();
+    }
+
     @Override
     public LogSubject getLogSubject()
     {
@@ -3333,13 +3345,13 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(final 
Class<C> clazz)
     {
-        if(clazz == org.apache.qpid.server.model.Consumer.class)
+        if (clazz == org.apache.qpid.server.model.Consumer.class)
         {
             return _queueConsumerManager == null
                     ? Collections.<C>emptySet()
                     : (Collection<C>) 
Lists.newArrayList(_queueConsumerManager.getAllIterator());
         }
-        else return Collections.emptySet();
+        else return super.getChildren(clazz);
     }
 
     @Override
@@ -3806,17 +3818,17 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     @Override
     public void linkAdded(final MessageSender sender, final PublishingLink 
link)
     {
-
         Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
-        if(oldValue != null)
+        if (oldValue != null)
         {
             _linkedSenders.put(sender, oldValue+1);
         }
-        if( link.TYPE_LINK.equals(link.getType()))
+        if (link.TYPE_LINK.equals(link.getType()))
         {
+            _producerCount.incrementAndGet();
             getEventLogger().message(SenderMessages.CREATE(link.getName(), 
link.getDestination()));
         }
-        if(Binding.TYPE.equals(link.getType()))
+        if (Binding.TYPE.equals(link.getType()))
         {
             _bindingCount++;
         }
@@ -3830,8 +3842,9 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         {
             _linkedSenders.put(sender, oldValue-1);
         }
-        if( link.TYPE_LINK.equals(link.getType()))
+        if (link.TYPE_LINK.equals(link.getType()))
         {
+            _producerCount.decrementAndGet();
             getEventLogger().message(SenderMessages.CLOSE(link.getName(), 
link.getDestination()));
         }
         if(Binding.TYPE.equals(link.getType()))
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
 
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 05dd46e57b..72840e1336 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -39,6 +39,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -50,12 +51,16 @@ import org.apache.qpid.server.logging.Outcome;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Producer;
+import org.apache.qpid.server.model.ProducerImpl;
+import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
@@ -63,6 +68,7 @@ import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
                                           X extends ConsumerTarget<X>>
@@ -93,6 +99,7 @@ public abstract class AbstractAMQPSession<S extends 
AbstractAMQPSession<S, X>,
     private final AtomicLong _transactedMessagesOut = new AtomicLong();
     private final AtomicLong _bytesIn = new AtomicLong();
     private final AtomicLong _bytesOut = new AtomicLong();
+    private final AtomicLong _producerCount = new AtomicLong();
 
     protected AbstractAMQPSession(final Connection<?> parent, final int 
sessionId)
     {
@@ -380,6 +387,67 @@ public abstract class AbstractAMQPSession<S extends 
AbstractAMQPSession<S, X>,
         _connection.registerTransactedMessageReceived();
     }
 
+    @Override
+    public long getProducerCount()
+    {
+        return _producerCount.get();
+    }
+
+    public Producer<?> addProducer(final PublishingLink link, final 
MessageDestination messageDestination)
+    {
+        if (link.TYPE_LINK.equals(link.getType()))
+        {
+            _producerCount.incrementAndGet();
+            return createProducer(this, link, messageDestination);
+        }
+        return null;
+    }
+
+    public void removeProducer(final PublishingLink link)
+    {
+        final Producer<?> producer = getChildByName(Producer.class, 
link.getName());
+        if (producer != null)
+        {
+            producer.deleteNoChecks();
+            _producerCount.decrementAndGet();
+        }
+    }
+
+    private Producer<?> createProducer(final AbstractAMQPSession<?, ?> session,
+                                       final PublishingLink publishingLink,
+                                       final MessageDestination 
messageDestination)
+            throws ConnectionScopedRuntimeException
+    {
+        return getTaskExecutor().run(new Task<Producer<?>, 
ConnectionScopedRuntimeException>()
+        {
+            @Override
+            public Producer<?> execute()
+            {
+                return new ProducerImpl<>(session, publishingLink, 
messageDestination);
+            }
+
+            @Override
+            public String getObject()
+            {
+                return AbstractAMQPSession.this.toString();
+            }
+
+            @Override
+            public String getAction()
+            {
+                return "create producer";
+            }
+
+            @Override
+            public String getArguments()
+            {
+                return "session=" + session +
+                       ", publishingLink=" + publishingLink +
+                       ", messageDestination=" + messageDestination;
+            }
+        });
+    }
+
     @Override
     protected void logCreated(final Map<String, Object> attributes,
                               final Outcome outcome)
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java 
b/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
new file mode 100644
index 0000000000..9cd67509c1
--- /dev/null
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+@SuppressWarnings({"rawtypes"})
+public class ProducerTest extends UnitTestBase
+{
+    private QueueManagingVirtualHost<?> _virtualHost;
+    private Exchange<?> _exchange;
+    private Queue<?> _queue;
+    private PublishingLink _link;
+    private MessageSender _sender;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _virtualHost = BrokerTestHelper.createVirtualHost("test", this);
+
+        final Map<String,Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, "test");
+        attributes.put(Exchange.DURABLE, false);
+        attributes.put(Exchange.TYPE, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        _exchange = _virtualHost.createChild(Exchange.class, attributes);
+
+        final Map<String,Object> queueAttributes = new HashMap<>();
+        queueAttributes.put(Queue.NAME, "queue");
+        queueAttributes.put(Queue.DURABLE, false);
+        queueAttributes.put(Queue.TYPE, "standard");
+        _queue = _virtualHost.createChild(Queue.class, queueAttributes);
+
+        final AMQPConnection connection = mock(AMQPConnection.class);
+        when(connection.getPrincipal()).thenReturn("test-principal");
+        when(connection.getRemoteAddress()).thenReturn("test-remote-address");
+
+        final AMQPSession session = mock(AMQPSession.class);
+        when(session.getId()).thenReturn(UUID.randomUUID());
+        when(session.getName()).thenReturn("test-session");
+        when(session.getAMQPConnection()).thenReturn(connection);
+
+        _link = mock(PublishingLink.class);
+        when(_link.getName()).thenReturn("test-link");
+        when(_link.getType()).thenReturn(PublishingLink.TYPE_LINK);
+
+        _sender = mock(MessageSender.class);
+    }
+
+    @Test
+    public void addAndRemoveLinkToExchange()
+    {
+        assertEquals(0, _exchange.getProducerCount());
+        _exchange.linkAdded(_sender, _link);
+        assertEquals(1, _exchange.getProducerCount());
+        _exchange.linkRemoved(_sender, _link);
+        assertEquals(0, _exchange.getProducerCount());
+    }
+
+    @Test
+    public void addAndRemoveLinkToQueue()
+    {
+        assertEquals(0, _queue.getProducerCount());
+        _queue.linkAdded(_sender, _link);
+        assertEquals(1, _queue.getProducerCount());
+        _queue.linkRemoved(_sender, _link);
+        assertEquals(0, _queue.getProducerCount());
+    }
+
+    @Test
+    public void closeExchange()
+    {
+        final Map<String,Object> attributes = new HashMap<>();
+        attributes.put(Exchange.NAME, "temporary");
+        attributes.put(Exchange.DURABLE, false);
+        attributes.put(Exchange.TYPE, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+        final Exchange exchange = _virtualHost.createChild(Exchange.class, 
attributes);
+        exchange.linkAdded(_sender, _link);
+        exchange.close();
+
+        assertEquals(0, exchange.getProducerCount());
+    }
+
+    @Test
+    public void closeQueue()
+    {
+        final Map<String,Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, "temporary");
+        attributes.put(Queue.DURABLE, false);
+        attributes.put(Queue.TYPE, "standard");
+
+        final Queue queue = _virtualHost.createChild(Queue.class, attributes);
+        queue.linkAdded(_sender, _link);
+        queue.close();
+
+        assertEquals(0, queue.getProducerCount());
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 9724850a67..73b2c86dc2 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,10 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -46,10 +43,10 @@ import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Producer;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -71,7 +68,6 @@ import 
org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -93,6 +89,8 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
     private volatile ReceivingDestination _receivingDestination;
     private volatile boolean _rejectedOutcomeSupportedBySource;
 
+    private Producer<?> _producer;
+
     private final PublishingLink _publishingLink = new PublishingLink()
     {
         @Override
@@ -148,7 +146,6 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
         setCreditWindow();
     }
 
-
     private TerminusDurability getDurability()
     {
         return getTarget().getDurable();
@@ -285,6 +282,10 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
                                                            transaction,
                                                            
session.getSecurityToken());
                             outcome = ACCEPTED;
+                            if (_producer != null)
+                            {
+                                
_producer.registerMessageDelivered(serverMessage.getSizeIncludingHeader());
+                            }
                         }
                         catch (UnroutableMessageException e)
                         {
@@ -500,14 +501,16 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
         {
             if (_receivingDestination != null && 
_receivingDestination.getMessageDestination() != null)
             {
+                getSession().removeProducer(_publishingLink);
+                _producer = null;
                 
_receivingDestination.getMessageDestination().linkRemoved(_messageSender, 
_publishingLink);
             }
             _receivingDestination = receivingDestination;
-            if(receivingDestination != null && 
receivingDestination.getMessageDestination() != null)
+            if (receivingDestination != null && 
receivingDestination.getMessageDestination() != null)
             {
+                _producer = getSession().addProducer(_publishingLink, 
receivingDestination.getMessageDestination());
                 
receivingDestination.getMessageDestination().linkAdded(_messageSender, 
_publishingLink);
             }
-
         }
     }
 
diff --git 
a/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml 
b/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
index dc1a544bf5..4adc5990bd 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
@@ -87,6 +87,7 @@
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Exchanges.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Queues.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Consumers.xml"/>
+  <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Producers.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Ports.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Authentication-Providers.xml"/>
   <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="management/managing/Java-Broker-Management-Managing-Keystores.xml"/>
diff --git 
a/doc/java-broker/src/docbkx/management/managing/Java-Broker-Management-Managing-Producers.xml
 
b/doc/java-broker/src/docbkx/management/managing/Java-Broker-Management-Managing-Producers.xml
new file mode 100644
index 0000000000..478fa7c103
--- /dev/null
+++ 
b/doc/java-broker/src/docbkx/management/managing/Java-Broker-Management-Managing-Producers.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<section xmlns="http://docbook.org/ns/docbook"; version="5.0" 
xml:id="Java-Broker-Management-Managing-Producers">
+  <title>Producers</title>
+  <para>A Producers represents an application sending messages to an exchange 
or a queue. Its presence
+    in the model indicates that an application is currently connected to the 
exchange or to the queue <emphasis>at this moment</emphasis>.
+    Producers are created when using AMQP 1.0 protocol.
+  </para>
+</section>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to