This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new 4e9bac0654 QPID-8602: [Broker-J] Show producer details in REST API
(#142)
4e9bac0654 is described below
commit 4e9bac0654125286de8653ff1b6bddb70264489b
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Tue Oct 11 16:00:36 2022 +0200
QPID-8602: [Broker-J] Show producer details in REST API (#142)
---
.../qpid/server/exchange/AbstractExchange.java | 172 ++++++++++++---------
.../org/apache/qpid/server/model/BrokerModel.java | 5 +
.../org/apache/qpid/server/model/Exchange.java | 3 +
.../org/apache/qpid/server/model/Producer.java | 57 +++++++
.../org/apache/qpid/server/model/ProducerImpl.java | 152 ++++++++++++++++++
.../java/org/apache/qpid/server/model/Queue.java | 5 +-
.../java/org/apache/qpid/server/model/Session.java | 5 +
.../apache/qpid/server/queue/AbstractQueue.java | 31 ++--
.../qpid/server/session/AbstractAMQPSession.java | 68 ++++++++
.../apache/qpid/server/exchange/ProducerTest.java | 135 ++++++++++++++++
.../v1_0/StandardReceivingLinkEndpoint.java | 19 ++-
.../Java-Broker-Management-Managing-Entities.xml | 1 +
.../Java-Broker-Management-Managing-Producers.xml | 29 ++++
13 files changed, 590 insertions(+), 92 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 17f62f91fe..32a7ce1502 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -107,7 +107,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
private static final Operation PUBLISH_ACTION =
Operation.PERFORM_ACTION("publish");
private final AtomicBoolean _closed = new AtomicBoolean();
- @ManagedAttributeField(beforeSet = "preSetAlternateBinding", afterSet =
"postSetAlternateBinding" )
+ @ManagedAttributeField(beforeSet = "preSetAlternateBinding", afterSet =
"postSetAlternateBinding")
private AlternateBinding _alternateBinding;
@ManagedAttributeField
private UnroutableMessageBehaviour _unroutableMessageBehaviour;
@@ -123,7 +123,8 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
//The logSubject for ths exchange
private LogSubject _logSubject;
- private final Set<DestinationReferrer> _referrers =
Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+ private final Set<DestinationReferrer> _referrers =
+ Collections.newSetFromMap(new
ConcurrentHashMap<DestinationReferrer, Boolean>());
private final AtomicLong _receivedMessageCount = new AtomicLong();
private final AtomicLong _receivedMessageSize = new AtomicLong();
@@ -131,6 +132,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
private final AtomicLong _routedMessageSize = new AtomicLong();
private final AtomicLong _droppedMessageCount = new AtomicLong();
private final AtomicLong _droppedMessageSize = new AtomicLong();
+ private final AtomicLong _producerCount = new AtomicLong();
private final List<Binding> _bindings = new CopyOnWriteArrayList<>();
@@ -143,7 +145,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
super(vhost, attributes);
Set<String> providedAttributeNames = new
HashSet<>(attributes.keySet());
providedAttributeNames.removeAll(getAttributeNames());
- if(!providedAttributeNames.isEmpty())
+ if (!providedAttributeNames.isEmpty())
{
throw new IllegalArgumentException("Unknown attributes provided: "
+ providedAttributeNames);
}
@@ -157,7 +159,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
{
super.onValidate();
- if(!isSystemProcess())
+ if (!isSystemProcess())
{
if (isReservedExchangeName(getName()))
{
@@ -173,11 +175,11 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
validateOrCreateAlternateBinding(((Exchange<?>) proxyForValidation),
false);
- if (changedAttributes.contains(ConfiguredObject.DESIRED_STATE) &&
proxyForValidation.getDesiredState() == State.DELETED)
+ if (changedAttributes.contains(ConfiguredObject.DESIRED_STATE) &&
+ proxyForValidation.getDesiredState() == State.DELETED)
{
doChecks();
}
-
}
private boolean isReservedExchangeName(String name)
@@ -192,7 +194,8 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
super.validateOnCreate();
if (getCreatingLinkInfo() != null && !isSystemProcess())
{
- throw new IllegalConfigurationException(String.format("Cannot
specify creatingLinkInfo for exchange '%s'", getName()));
+ throw new IllegalConfigurationException(String.format("Cannot
specify creatingLinkInfo for exchange '%s'",
+ getName()));
}
}
@@ -208,7 +211,10 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
{
super.onOpen();
final ConfiguredDerivedMethodAttribute<Exchange<?>,
Collection<Binding>> durableBindingsAttribute =
- (ConfiguredDerivedMethodAttribute<Exchange<?>,
Collection<Binding>>)
getModel().getTypeRegistry().getAttributeTypes(getTypeClass()).get(DURABLE_BINDINGS);
+ (ConfiguredDerivedMethodAttribute<Exchange<?>,
Collection<Binding>>) getModel().getTypeRegistry()
+
.getAttributeTypes(
+
getTypeClass())
+
.get(DURABLE_BINDINGS);
final Collection<Binding> bindings =
durableBindingsAttribute.convertValue(getActualAttributes().get(DURABLE_BINDINGS),
this);
if (bindings != null)
@@ -219,7 +225,8 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
final MessageDestination messageDestination =
getOpenedMessageDestination(b.getDestination());
if (messageDestination != null)
{
- Map<String, Object> arguments = b.getArguments() == null ?
Collections.emptyMap() : b.getArguments();
+ Map<String, Object> arguments =
+ b.getArguments() == null ? Collections.emptyMap()
: b.getArguments();
try
{
onBind(new BindingIdentifier(b.getBindingKey(),
messageDestination), arguments);
@@ -240,11 +247,13 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
final LinkModel link;
if (_creatingLinkInfo.isSendingLink())
{
- link =
_virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(),
_creatingLinkInfo.getLinkName());
+ link =
_virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(),
+
_creatingLinkInfo.getLinkName());
}
else
{
- link =
_virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(),
_creatingLinkInfo.getLinkName());
+ link =
_virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(),
+
_creatingLinkInfo.getLinkName());
}
addLifetimeConstraint(link);
}
@@ -266,7 +275,9 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
}
else
{
- LOGGER.warn("Cannot find alternate binding destination '{}'
for exchange '{}'", alternateDestination, toString());
+ LOGGER.warn("Cannot find alternate binding destination '{}'
for exchange '{}'",
+ alternateDestination,
+ toString());
}
}
}
@@ -308,19 +319,19 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
private void performDelete()
{
- if(_closed.compareAndSet(false,true))
+ if (_closed.compareAndSet(false, true))
{
performDeleteTasks();
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
final MessageDestination messageDestination =
getAttainedMessageDestination(b.getDestination());
- if(messageDestination != null)
+ if (messageDestination != null)
{
messageDestination.linkRemoved(this, b);
}
}
- for(MessageSender sender : _linkedSenders.keySet())
+ for (MessageSender sender : _linkedSenders.keySet())
{
sender.destinationRemoved(this);
}
@@ -334,12 +345,12 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
private void doChecks()
{
- if(hasReferrers())
+ if (hasReferrers())
{
throw new MessageDestinationIsAlternateException(getName());
}
- if(isReservedExchangeName(getName()))
+ if (isReservedExchangeName(getName()))
{
throw new RequiredExchangeException(getName());
}
@@ -347,13 +358,13 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
@DoOnConfigThread
- public void destinationRemoved(@Param(name="destination") final
MessageDestination destination)
+ public void destinationRemoved(@Param(name = "destination") final
MessageDestination destination)
{
Iterator<Binding> bindingIterator = _bindings.iterator();
- while(bindingIterator.hasNext())
+ while (bindingIterator.hasNext())
{
Binding b = bindingIterator.next();
- if(b.getDestination().equals(destination.getName()))
+ if (b.getDestination().equals(destination.getName()))
{
final Map<String, Object> bindArguments =
UNBIND_ARGUMENTS_CREATOR.createMap(b.getBindingKey(),
destination);
@@ -362,7 +373,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
_bindings.remove(b);
}
}
- if(!autoDeleteIfNecessary())
+ if (!autoDeleteIfNecessary())
{
if (destination.isDurable() && isDurable())
{
@@ -381,7 +392,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
public String toString()
{
- return getClass().getSimpleName() + "[" + getName() +"]";
+ return getClass().getSimpleName() + "[" + getName() + "]";
}
@Override
@@ -391,19 +402,19 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
}
@Override
- public boolean isBound(String bindingKey, Map<String,Object> arguments,
Queue<?> queue)
+ public boolean isBound(String bindingKey, Map<String, Object> arguments,
Queue<?> queue)
{
if (bindingKey == null)
{
bindingKey = "";
}
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(bindingKey.equals(b.getBindingKey()) &&
queue.getName().equals(b.getDestination()))
+ if (bindingKey.equals(b.getBindingKey()) &&
queue.getName().equals(b.getDestination()))
{
return (b.getArguments() == null || b.getArguments().isEmpty())
- ? (arguments == null || arguments.isEmpty())
- : b.getArguments().equals(arguments);
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments);
}
}
return false;
@@ -417,9 +428,9 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
bindingKey = "";
}
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(bindingKey.equals(b.getBindingKey()) &&
queue.getName().equals(b.getDestination()))
+ if (bindingKey.equals(b.getBindingKey()) &&
queue.getName().equals(b.getDestination()))
{
return true;
}
@@ -435,9 +446,9 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
bindingKey = "";
}
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(bindingKey.equals(b.getBindingKey()))
+ if (bindingKey.equals(b.getBindingKey()))
{
return true;
}
@@ -448,9 +459,9 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
public boolean isBound(Queue<?> queue)
{
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(queue.getName().equals(b.getDestination()))
+ if (queue.getName().equals(b.getDestination()))
{
return true;
}
@@ -461,12 +472,12 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
public boolean isBound(Map<String, Object> arguments, Queue<?> queue)
{
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(queue.getName().equals(b.getDestination()) &&
- ((b.getArguments() == null || b.getArguments().isEmpty())
- ? (arguments == null || arguments.isEmpty())
- : b.getArguments().equals(arguments)))
+ if (queue.getName().equals(b.getDestination()) &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
{
return true;
}
@@ -477,11 +488,11 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
public boolean isBound(Map<String, Object> arguments)
{
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(((b.getArguments() == null || b.getArguments().isEmpty())
- ? (arguments == null || arguments.isEmpty())
- : b.getArguments().equals(arguments)))
+ if (((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
{
return true;
}
@@ -498,12 +509,12 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
bindingKey = "";
}
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(b.getBindingKey().equals(bindingKey) &&
- ((b.getArguments() == null || b.getArguments().isEmpty())
- ? (arguments == null || arguments.isEmpty())
- : b.getArguments().equals(arguments)))
+ if (b.getBindingKey().equals(bindingKey) &&
+ ((b.getArguments() == null || b.getArguments().isEmpty())
+ ? (arguments == null || arguments.isEmpty())
+ : b.getArguments().equals(arguments)))
{
return true;
}
@@ -534,7 +545,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@SuppressWarnings("unused")
private void postSetAlternateBinding()
{
- if(_alternateBinding != null)
+ if (_alternateBinding != null)
{
_alternateBindingDestination =
getOpenedMessageDestination(_alternateBinding.getDestination());
if (_alternateBindingDestination != null)
@@ -587,6 +598,11 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
return getBindings().size();
}
+ @Override
+ public long getProducerCount()
+ {
+ return _producerCount.get();
+ }
@Override
public <M extends ServerMessage<? extends StorableMessageMetaData>>
RoutingResult<M> route(final M message,
@@ -655,7 +671,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
}
finally
{
- if(topLevel)
+ if (topLevel)
{
CURRENT_ROUTING.set(null);
}
@@ -664,9 +680,9 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
protected abstract <M extends ServerMessage<? extends
StorableMessageMetaData>> void doRoute(final M message,
- final String routingAddress,
- final InstanceProperties
instanceProperties,
- final RoutingResult<M> result);
+
final String routingAddress,
+
final InstanceProperties instanceProperties,
+
final RoutingResult<M> result);
@Override
public boolean bind(final String destination,
@@ -695,7 +711,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
throw new IllegalArgumentException(String.format("Destination '%s'
is not found.", destination));
}
- if(arguments == null)
+ if (arguments == null)
{
arguments = Collections.emptyMap();
}
@@ -703,7 +719,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
Binding newBinding = new BindingImpl(bindingKey, destination,
arguments);
Binding previousBinding = null;
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
if (b.getBindingKey().equals(bindingKey) &&
b.getDestination().equals(messageDestination.getName()))
{
@@ -717,9 +733,8 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
return false;
}
-
final BindingIdentifier bindingIdentifier = new
BindingIdentifier(bindingKey, messageDestination);
- if(previousBinding != null)
+ if (previousBinding != null)
{
onBindingUpdated(bindingIdentifier, arguments);
}
@@ -738,7 +753,7 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
_bindings.remove(previousBinding);
}
_bindings.add(newBinding);
- if(isDurable() && messageDestination.isDurable())
+ if (isDurable() && messageDestination.isDurable())
{
final Collection<Binding> durableBindings = getDurableBindings();
attributeSet(DURABLE_BINDINGS, durableBindings, durableBindings);
@@ -751,9 +766,9 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
{
List<Binding> bindings = new ArrayList<>();
final String destinationName = destination.getName();
- for(Binding b : _bindings)
+ for (Binding b : _bindings)
{
- if(b.getDestination().equals(destinationName))
+ if (b.getDestination().equals(destinationName))
{
bindings.add(b);
}
@@ -765,13 +780,13 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
public Collection<Binding> getDurableBindings()
{
List<Binding> durableBindings;
- if(isDurable())
+ if (isDurable())
{
durableBindings = new ArrayList<>();
for (Binding b : _bindings)
{
MessageDestination destination =
getAttainedMessageDestination(b.getDestination());
- if(destination != null && destination.isDurable())
+ if (destination != null && destination.isDurable())
{
durableBindings.add(b);
}
@@ -844,7 +859,6 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
}
}
return false;
-
}
@Override
@@ -913,13 +927,14 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
private boolean isAutoDeletePending()
{
- return (getLifetimePolicy() ==
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() ==
LifetimePolicy.DELETE_ON_NO_LINKS )
- && getBindingCount() == 0;
+ return (getLifetimePolicy() ==
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS ||
+ getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS) &&
+ getBindingCount() == 0;
}
@SuppressWarnings("unused")
- @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED},
desiredState = State.ACTIVE)
+ @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED},
desiredState = State.ACTIVE)
private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
@@ -1030,13 +1045,14 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
public void linkAdded(final MessageSender sender, final PublishingLink
link)
{
- Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
- if(oldValue != null)
+ final Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
+ if (oldValue != null)
{
- _linkedSenders.put(sender, oldValue+1);
+ _linkedSenders.put(sender, oldValue + 1);
}
- if( link.TYPE_LINK.equals(link.getType()))
+ if (link.TYPE_LINK.equals(link.getType()))
{
+ _producerCount.incrementAndGet();
getEventLogger().message(SenderMessages.CREATE(link.getName(),
link.getDestination()));
}
}
@@ -1044,17 +1060,25 @@ public abstract class AbstractExchange<T extends
AbstractExchange<T>>
@Override
public void linkRemoved(final MessageSender sender, final PublishingLink
link)
{
- int oldValue = _linkedSenders.remove(sender);
- if(oldValue != 1)
+ final int oldValue = _linkedSenders.remove(sender);
+ if (oldValue != 1)
{
- _linkedSenders.put(sender, oldValue-1);
+ _linkedSenders.put(sender, oldValue - 1);
}
- if( link.TYPE_LINK.equals(link.getType()))
+ if (link.TYPE_LINK.equals(link.getType()))
{
+ _producerCount.decrementAndGet();
getEventLogger().message(SenderMessages.CLOSE(link.getName(),
link.getDestination()));
}
}
+ @Override
+ public void close()
+ {
+ _producerCount.set(0);
+ super.close();
+ }
+
private void validateOrCreateAlternateBinding(final Exchange<?> exchange,
final boolean mayCreate)
{
Object value = exchange.getAttribute(ALTERNATE_BINDING);
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
index 89532975e6..82225174e6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
@@ -80,6 +80,9 @@ public final class BrokerModel extends Model
* 8.0
* Added new broker statistics: processCpuTime, processCpuLoad
* Added new context variables for queues and exchanges to configure
behaviour on unknown declared arguments
+ *
+ * 9.0
+ * Introduced PublishProducer as a child of Exchange and
PointToPointProducer as child of Queue
*/
public static final int MODEL_MAJOR_VERSION = 9;
public static final int MODEL_MINOR_VERSION = 0;
@@ -138,6 +141,8 @@ public final class BrokerModel extends Model
addRelationship(Connection.class, Session.class);
+ addRelationship(Session.class, Producer.class);
+
addRelationship(Queue.class, Consumer.class);
_objectFactory = new ConfiguredObjectFactoryImpl(this);
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 5e2d68f198..f95c6a3948 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -111,6 +111,9 @@ public interface Exchange<X extends Exchange<X>> extends
ConfiguredObject<X>, Me
description = "Number of messages received by this
exchange.", metricName = "inbound_messages_count")
long getMessagesIn();
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.COUNT, label = "Producers",
+ description = "Number of producers to this exchange.", metricName
= "producers_count")
+ long getProducerCount();
@ManagedOperation(changesConfiguredObjectState = true,
description = "Bind a given destination to exchange
using a given bindingKey and arguments."
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
new file mode 100644
index 0000000000..7235e6a14f
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+@ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer")
+public interface Producer<X extends Producer<X>> extends ConfiguredObject<X>
+{
+ enum DestinationType { EXCHANGE, QUEUE }
+
+ void registerMessageDelivered(long messageSize);
+
+ @DerivedAttribute(description = "Session ID")
+ String getSessionId();
+
+ @DerivedAttribute(description = "Session name")
+ String getSessionName();
+
+ @DerivedAttribute(description = "Connection principal")
+ String getPrincipal();
+
+ @DerivedAttribute(description = "Connection remote address")
+ String getRemoteAddress();
+
+ @DerivedAttribute(description = "Destination name")
+ String getDestination();
+
+ @DerivedAttribute(description = "Destination type (exchange or queue)")
+ DestinationType getDestinationType();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.MESSAGES)
+ int getMessagesOut();
+
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.BYTES)
+ long getBytesOut();
+
+ ListenableFuture<Void> deleteNoChecks();
+}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
new file mode 100644
index 0000000000..431b4e31d5
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.session.AbstractAMQPSession;
+
+// sonar complains about underscores in variable names
+@SuppressWarnings("java:S116")
+public class ProducerImpl<X extends Producer<X>>
+ extends AbstractConfiguredObject<ProducerImpl<X>>
+ implements Producer<ProducerImpl<X>>
+{
+ private final String _sessionId;
+
+ private final String _sessionName;
+
+ private final String _principal;
+
+ private final String _remoteAddress;
+
+ private final String _destination;
+
+ private final DestinationType _destinationType;
+
+ private final AtomicInteger _messagesOut = new AtomicInteger();
+
+ private final AtomicLong _bytesOut = new AtomicLong();
+
+ public ProducerImpl(final AbstractAMQPSession<?, ?> session,
+ final PublishingLink publishingLink,
+ final MessageDestination messageDestination)
+ {
+ super(session, createAttributeMap(publishingLink));
+ _sessionId = String.valueOf(session.getId());
+ _sessionName = session.getName();
+ _principal = session.getAMQPConnection().getPrincipal();
+ _remoteAddress = session.getAMQPConnection().getRemoteAddress();
+ _destination = messageDestination.getName();
+ _destinationType = messageDestination instanceof Exchange ?
DestinationType.EXCHANGE : DestinationType.QUEUE;
+
+ registerWithParents();
+ open();
+ }
+
+ private static Map<String, Object> createAttributeMap(final PublishingLink
publishingLink)
+ {
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(ID, UUID.randomUUID());
+ attributes.put(NAME, publishingLink.getName());
+ attributes.put(DURABLE, false);
+ attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
+ attributes.put(STATE, State.ACTIVE);
+ return attributes;
+ }
+
+ @SuppressWarnings("unused")
+ @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED},
desiredState = State.ACTIVE)
+ private ListenableFuture<Void> activate()
+ {
+ setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
+ }
+
+ @Override
+ public ListenableFuture<Void> deleteNoChecks()
+ {
+ return super.deleteNoChecks();
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ _messagesOut.incrementAndGet();
+ _bytesOut.addAndGet(messageSize);
+ }
+
+ @Override
+ public String getSessionId()
+ {
+ return _sessionId;
+ }
+
+ @Override
+ public String getSessionName()
+ {
+ return _sessionName;
+ }
+
+ @Override
+ public String getPrincipal()
+ {
+ return _principal;
+ }
+
+ @Override
+ public String getRemoteAddress()
+ {
+ return _remoteAddress;
+ }
+
+ @Override
+ public String getDestination()
+ {
+ return _destination;
+ }
+
+ @Override
+ public DestinationType getDestinationType()
+ {
+ return _destinationType;
+ }
+
+ @Override
+ public int getMessagesOut()
+ {
+ return _messagesOut.get();
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return _bytesOut.get();
+ }
+}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index c0a5f2f4fb..1e1f5db950 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -372,7 +372,6 @@ public interface Queue<X extends Queue<X>> extends
ConfiguredObject<X>,
@ManagedOperation(nonModifying = true, changesConfiguredObjectState =
false, skipAclCheck = true)
Collection<PublishingLink> getPublishingLinks();
-
@Override
@ManagedOperation(nonModifying = true, changesConfiguredObjectState =
false, skipAclCheck = true)
Collection<QueueConsumer<?,?>> getConsumers();
@@ -519,6 +518,10 @@ public interface Queue<X extends Queue<X>> extends
ConfiguredObject<X>,
description = "Total number of enqueued malformed messages.",
metricName = "malformed_messages_count")
long getTotalMalformedMessages();
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.COUNT, label = "Producers",
+ description = "Number of producers to this queue.", metricName =
"producers_count")
+ long getProducerCount();
+
@ManagedOperation(description = "move messages from this queue to
another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The
queue to which the messages should be moved", mandatory = true) Queue<?>
destination,
@Param(name = "messageIds", description = "If
provided, only messages in the queue whose (internal) message-id is supplied
will be considered for moving") List<Long> messageIds,
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 894df6a83c..36d73a817d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -95,4 +95,9 @@ public interface Session<X extends Session<X>> extends
ConfiguredObject<X>
@ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units =
StatisticUnit.MESSAGES, label = "Transacted Outbound",
description = "Total number of messages received by this session
within a transaction.", metricName = "transacted_outbound_messages_count")
long getTransactedMessagesOut();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.COUNT, label = "Producers",
+ description = "Number of producers to this exchange.", metricName
= "producers_count")
+ long getProducerCount();
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index a0b6393997..a7471c3b3a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -50,6 +50,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@@ -81,7 +82,6 @@ import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.filter.selector.ParseException;
import org.apache.qpid.server.filter.selector.TokenMgrError;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.QueueMessages;
@@ -279,6 +279,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
private final Set<DestinationReferrer> _referrers =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<LocalTransaction> _transactions =
ConcurrentHashMap.newKeySet();
private final LocalTransaction.LocalTransactionListener
_localTransactionListener = _transactions::remove;
+ private final AtomicLong _producerCount = new AtomicLong();
private boolean _closing;
private Map<String, String> _mimeTypeToFileExtension =
Collections.emptyMap();
@@ -1064,7 +1065,12 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
return super.beforeClose();
}
-
+ @Override
+ public void close()
+ {
+ _producerCount.set(0);
+ super.close();
+ }
<T extends ConsumerTarget<T>> void unregisterConsumer(final
QueueConsumerImpl<T> consumer)
{
@@ -1230,6 +1236,12 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
return _bindingCount;
}
+ @Override
+ public long getProducerCount()
+ {
+ return _producerCount.get();
+ }
+
@Override
public LogSubject getLogSubject()
{
@@ -3333,13 +3345,13 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(final
Class<C> clazz)
{
- if(clazz == org.apache.qpid.server.model.Consumer.class)
+ if (clazz == org.apache.qpid.server.model.Consumer.class)
{
return _queueConsumerManager == null
? Collections.<C>emptySet()
: (Collection<C>)
Lists.newArrayList(_queueConsumerManager.getAllIterator());
}
- else return Collections.emptySet();
+ else return super.getChildren(clazz);
}
@Override
@@ -3806,17 +3818,17 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
@Override
public void linkAdded(final MessageSender sender, final PublishingLink
link)
{
-
Integer oldValue = _linkedSenders.putIfAbsent(sender, 1);
- if(oldValue != null)
+ if (oldValue != null)
{
_linkedSenders.put(sender, oldValue+1);
}
- if( link.TYPE_LINK.equals(link.getType()))
+ if (link.TYPE_LINK.equals(link.getType()))
{
+ _producerCount.incrementAndGet();
getEventLogger().message(SenderMessages.CREATE(link.getName(),
link.getDestination()));
}
- if(Binding.TYPE.equals(link.getType()))
+ if (Binding.TYPE.equals(link.getType()))
{
_bindingCount++;
}
@@ -3830,8 +3842,9 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
{
_linkedSenders.put(sender, oldValue-1);
}
- if( link.TYPE_LINK.equals(link.getType()))
+ if (link.TYPE_LINK.equals(link.getType()))
{
+ _producerCount.decrementAndGet();
getEventLogger().message(SenderMessages.CLOSE(link.getName(),
link.getDestination()));
}
if(Binding.TYPE.equals(link.getType()))
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index 05dd46e57b..72840e1336 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -39,6 +39,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -50,12 +51,16 @@ import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Producer;
+import org.apache.qpid.server.model.ProducerImpl;
+import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
@@ -63,6 +68,7 @@ import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
X extends ConsumerTarget<X>>
@@ -93,6 +99,7 @@ public abstract class AbstractAMQPSession<S extends
AbstractAMQPSession<S, X>,
private final AtomicLong _transactedMessagesOut = new AtomicLong();
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
+ private final AtomicLong _producerCount = new AtomicLong();
protected AbstractAMQPSession(final Connection<?> parent, final int
sessionId)
{
@@ -380,6 +387,67 @@ public abstract class AbstractAMQPSession<S extends
AbstractAMQPSession<S, X>,
_connection.registerTransactedMessageReceived();
}
+ @Override
+ public long getProducerCount()
+ {
+ return _producerCount.get();
+ }
+
+ public Producer<?> addProducer(final PublishingLink link, final
MessageDestination messageDestination)
+ {
+ if (link.TYPE_LINK.equals(link.getType()))
+ {
+ _producerCount.incrementAndGet();
+ return createProducer(this, link, messageDestination);
+ }
+ return null;
+ }
+
+ public void removeProducer(final PublishingLink link)
+ {
+ final Producer<?> producer = getChildByName(Producer.class,
link.getName());
+ if (producer != null)
+ {
+ producer.deleteNoChecks();
+ _producerCount.decrementAndGet();
+ }
+ }
+
+ private Producer<?> createProducer(final AbstractAMQPSession<?, ?> session,
+ final PublishingLink publishingLink,
+ final MessageDestination
messageDestination)
+ throws ConnectionScopedRuntimeException
+ {
+ return getTaskExecutor().run(new Task<Producer<?>,
ConnectionScopedRuntimeException>()
+ {
+ @Override
+ public Producer<?> execute()
+ {
+ return new ProducerImpl<>(session, publishingLink,
messageDestination);
+ }
+
+ @Override
+ public String getObject()
+ {
+ return AbstractAMQPSession.this.toString();
+ }
+
+ @Override
+ public String getAction()
+ {
+ return "create producer";
+ }
+
+ @Override
+ public String getArguments()
+ {
+ return "session=" + session +
+ ", publishingLink=" + publishingLink +
+ ", messageDestination=" + messageDestination;
+ }
+ });
+ }
+
@Override
protected void logCreated(final Map<String, Object> attributes,
final Outcome outcome)
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
b/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
new file mode 100644
index 0000000000..9cd67509c1
--- /dev/null
+++
b/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.MessageSender;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.PublishingLink;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.session.AMQPSession;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+@SuppressWarnings({"rawtypes"})
+public class ProducerTest extends UnitTestBase
+{
+ private QueueManagingVirtualHost<?> _virtualHost;
+ private Exchange<?> _exchange;
+ private Queue<?> _queue;
+ private PublishingLink _link;
+ private MessageSender _sender;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _virtualHost = BrokerTestHelper.createVirtualHost("test", this);
+
+ final Map<String,Object> attributes = new HashMap<>();
+ attributes.put(Exchange.NAME, "test");
+ attributes.put(Exchange.DURABLE, false);
+ attributes.put(Exchange.TYPE, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ _exchange = _virtualHost.createChild(Exchange.class, attributes);
+
+ final Map<String,Object> queueAttributes = new HashMap<>();
+ queueAttributes.put(Queue.NAME, "queue");
+ queueAttributes.put(Queue.DURABLE, false);
+ queueAttributes.put(Queue.TYPE, "standard");
+ _queue = _virtualHost.createChild(Queue.class, queueAttributes);
+
+ final AMQPConnection connection = mock(AMQPConnection.class);
+ when(connection.getPrincipal()).thenReturn("test-principal");
+ when(connection.getRemoteAddress()).thenReturn("test-remote-address");
+
+ final AMQPSession session = mock(AMQPSession.class);
+ when(session.getId()).thenReturn(UUID.randomUUID());
+ when(session.getName()).thenReturn("test-session");
+ when(session.getAMQPConnection()).thenReturn(connection);
+
+ _link = mock(PublishingLink.class);
+ when(_link.getName()).thenReturn("test-link");
+ when(_link.getType()).thenReturn(PublishingLink.TYPE_LINK);
+
+ _sender = mock(MessageSender.class);
+ }
+
+ @Test
+ public void addAndRemoveLinkToExchange()
+ {
+ assertEquals(0, _exchange.getProducerCount());
+ _exchange.linkAdded(_sender, _link);
+ assertEquals(1, _exchange.getProducerCount());
+ _exchange.linkRemoved(_sender, _link);
+ assertEquals(0, _exchange.getProducerCount());
+ }
+
+ @Test
+ public void addAndRemoveLinkToQueue()
+ {
+ assertEquals(0, _queue.getProducerCount());
+ _queue.linkAdded(_sender, _link);
+ assertEquals(1, _queue.getProducerCount());
+ _queue.linkRemoved(_sender, _link);
+ assertEquals(0, _queue.getProducerCount());
+ }
+
+ @Test
+ public void closeExchange()
+ {
+ final Map<String,Object> attributes = new HashMap<>();
+ attributes.put(Exchange.NAME, "temporary");
+ attributes.put(Exchange.DURABLE, false);
+ attributes.put(Exchange.TYPE, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+
+ final Exchange exchange = _virtualHost.createChild(Exchange.class,
attributes);
+ exchange.linkAdded(_sender, _link);
+ exchange.close();
+
+ assertEquals(0, exchange.getProducerCount());
+ }
+
+ @Test
+ public void closeQueue()
+ {
+ final Map<String,Object> attributes = new HashMap<>();
+ attributes.put(Queue.NAME, "temporary");
+ attributes.put(Queue.DURABLE, false);
+ attributes.put(Queue.TYPE, "standard");
+
+ final Queue queue = _virtualHost.createChild(Queue.class, attributes);
+ queue.linkAdded(_sender, _link);
+ queue.close();
+
+ assertEquals(0, queue.getProducerCount());
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 9724850a67..73b2c86dc2 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,10 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
@@ -46,10 +43,10 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Producer;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -71,7 +68,6 @@ import
org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.AsyncCommand;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -93,6 +89,8 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
private volatile ReceivingDestination _receivingDestination;
private volatile boolean _rejectedOutcomeSupportedBySource;
+ private Producer<?> _producer;
+
private final PublishingLink _publishingLink = new PublishingLink()
{
@Override
@@ -148,7 +146,6 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
setCreditWindow();
}
-
private TerminusDurability getDurability()
{
return getTarget().getDurable();
@@ -285,6 +282,10 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
transaction,
session.getSecurityToken());
outcome = ACCEPTED;
+ if (_producer != null)
+ {
+
_producer.registerMessageDelivered(serverMessage.getSizeIncludingHeader());
+ }
}
catch (UnroutableMessageException e)
{
@@ -500,14 +501,16 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
{
if (_receivingDestination != null &&
_receivingDestination.getMessageDestination() != null)
{
+ getSession().removeProducer(_publishingLink);
+ _producer = null;
_receivingDestination.getMessageDestination().linkRemoved(_messageSender,
_publishingLink);
}
_receivingDestination = receivingDestination;
- if(receivingDestination != null &&
receivingDestination.getMessageDestination() != null)
+ if (receivingDestination != null &&
receivingDestination.getMessageDestination() != null)
{
+ _producer = getSession().addProducer(_publishingLink,
receivingDestination.getMessageDestination());
receivingDestination.getMessageDestination().linkAdded(_messageSender,
_publishingLink);
}
-
}
}
diff --git
a/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
b/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
index dc1a544bf5..4adc5990bd 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Management-Managing-Entities.xml
@@ -87,6 +87,7 @@
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Exchanges.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Queues.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Consumers.xml"/>
+ <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Producers.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Ports.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Authentication-Providers.xml"/>
<xi:include xmlns:xi="http://www.w3.org/2001/XInclude"
href="management/managing/Java-Broker-Management-Managing-Keystores.xml"/>
diff --git
a/doc/java-broker/src/docbkx/management/managing/Java-Broker-Management-Managing-Producers.xml
b/doc/java-broker/src/docbkx/management/managing/Java-Broker-Management-Managing-Producers.xml
new file mode 100644
index 0000000000..478fa7c103
--- /dev/null
+++
b/doc/java-broker/src/docbkx/management/managing/Java-Broker-Management-Managing-Producers.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<section xmlns="http://docbook.org/ns/docbook" version="5.0"
xml:id="Java-Broker-Management-Managing-Producers">
+ <title>Producers</title>
+ <para>A Producers represents an application sending messages to an exchange
or a queue. Its presence
+ in the model indicates that an application is currently connected to the
exchange or to the queue <emphasis>at this moment</emphasis>.
+ Producers are created when using AMQP 1.0 protocol.
+ </para>
+</section>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]