This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push:
new 00f976b QPID-6948: [Broker-J] Fix relationship between model v6.1
session and consumers
00f976b is described below
commit 00f976b1fcc6db515cd3c0a0e4b22bc3a34edfec
Author: Alex Rudyy <[email protected]>
AuthorDate: Sun Dec 16 22:39:35 2018 +0000
QPID-6948: [Broker-J] Fix relationship between model v6.1 session and
consumers
---
.../java/org/apache/qpid/server/model/Session.java | 7 +++
.../apache/qpid/server/queue/AbstractQueue.java | 4 +-
.../apache/qpid/server/session/AMQPSession.java | 5 +-
.../qpid/server/session/AbstractAMQPSession.java | 16 +++++-
.../v6_1/category/SessionController.java | 60 +++++++---------------
.../v6_1/category/DestinationControllerTest.java | 1 -
.../v6_1/category/SessionControllerTest.java | 26 ++++------
7 files changed, 56 insertions(+), 63 deletions(-)
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index ccb396a..af1084e 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.model;
+import java.util.Set;
+
@ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session")
public interface Session<X extends Session<X>> extends ConfiguredObject<X>
{
@@ -58,4 +60,9 @@ public interface Session<X extends Session<X>> extends
ConfiguredObject<X>
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.MESSAGES, label = "Prefetched")
long getUnacknowledgedMessages();
+
+ @ManagedOperation(nonModifying = true,
+ changesConfiguredObjectState = false,
+ skipAclCheck = true)
+ Set<? extends Consumer<?, ?>> getConsumers();
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index ba98f1a..84a4700 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1050,7 +1050,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
- session.incConsumerCount();
+ session.consumerAdded(consumer);
addChangeListener(new AbstractConfigurationChangeListener()
{
@Override
@@ -1058,7 +1058,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
{
if (child.equals(consumer))
{
- session.decConsumerCount();
+ session.consumerRemoved(consumer);
removeChangeListener(this);
}
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index d68592b..4603e22 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.transport.AMQPConnection;
@@ -60,9 +61,9 @@ public interface AMQPSession<S extends
org.apache.qpid.server.session.AMQPSessio
@Override
long getConsumerCount();
- void incConsumerCount();
+ void consumerAdded(Consumer<?, X> consumer);
- void decConsumerCount();
+ void consumerRemoved(Consumer<?, X> consumer);
/**
* Return the time the current transaction started.
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index db69dd9..7256444 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -21,11 +21,13 @@
package org.apache.qpid.server.session;
import java.security.AccessControlContext;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,6 +49,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
@@ -77,6 +80,7 @@ public abstract class AbstractAMQPSession<S extends
AbstractAMQPSession<S, X>,
protected final Set<AbstractConsumerTarget> _consumersWithPendingWork =
new ScheduledConsumerTargetSet<>();
private Iterator<AbstractConsumerTarget> _processPendingIterator;
+ private final Set<Consumer<?,X>> _consumers =
ConcurrentHashMap.newKeySet();
protected AbstractAMQPSession(final Connection<?> parent, final int
sessionId)
{
@@ -256,15 +260,23 @@ public abstract class AbstractAMQPSession<S extends
AbstractAMQPSession<S, X>,
}
@Override
- public final void incConsumerCount()
+ public final void consumerAdded(Consumer<?, X> consumer)
{
_consumerCount.incrementAndGet();
+ _consumers.add(consumer);
}
@Override
- public final void decConsumerCount()
+ public final void consumerRemoved(Consumer<?, X> consumer)
{
_consumerCount.decrementAndGet();
+ _consumers.remove(consumer);
+ }
+
+ @Override
+ public Set<? extends Consumer<?, ?>> getConsumers()
+ {
+ return Collections.unmodifiableSet(_consumers);
}
protected abstract void updateBlockedStateIfNecessary();
diff --git
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
index 5177b1e..a279e7c 100644
---
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
+++
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.management.plugin.controller.v6_1.category;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
import java.util.Set;
-import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.qpid.server.management.plugin.ManagementException;
+import org.apache.qpid.server.management.plugin.ManagementResponse;
+import org.apache.qpid.server.management.plugin.ResponseType;
import
org.apache.qpid.server.management.plugin.controller.GenericLegacyConfiguredObject;
import
org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
import
org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
@@ -60,52 +62,28 @@ public class SessionController extends
LegacyCategoryController
}
@Override
- @SuppressWarnings("unchecked")
public Collection<LegacyConfiguredObject> getChildren(final String
category)
{
if (ConsumerController.TYPE.equalsIgnoreCase(category))
{
final LegacyConfiguredObject nextVersionSession =
getNextVersionLegacyConfiguredObject();
- final LegacyConfiguredObject connection =
-
nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION);
- final LegacyConfiguredObject vh =
connection.getParent(VirtualHostController.TYPE);
- final UUID sessionID = (UUID) getAttribute(ID);
- final UUID connectionID = (UUID) connection.getAttribute(ID);
- final List<LegacyConfiguredObject> consumers = new
ArrayList<>();
- final Collection<LegacyConfiguredObject> queues =
vh.getChildren(QueueController.TYPE);
- if (queues != null)
+ final ManagementResponse result =
+ nextVersionSession.invoke("getConsumers",
Collections.emptyMap(), true);
+ if (result != null && result.getResponseCode() == 200 &&
result.getType() == ResponseType.MODEL_OBJECT)
{
- queues.forEach(q -> {
- final Collection<LegacyConfiguredObject>
queueConsumers =
- q.getChildren(ConsumerController.TYPE);
- if (queueConsumers != null)
- {
- queueConsumers.stream()
- .filter(c -> sameSession(c,
sessionID, connectionID))
- .map(c ->
getManagementController().convertFromNextVersion(c))
- .forEach(consumers::add);
- }
- });
+ final Object objects = result.getBody();
+ if (objects instanceof Collection)
+ {
+ return ((Collection<?>) objects).stream().filter(o ->
o instanceof LegacyConfiguredObject)
+ .map(o ->
(LegacyConfiguredObject)o)
+ .map(o ->
getManagementController().convertFromNextVersion(o))
+
.collect(Collectors.toList());
+ }
}
- return consumers;
+ throw
ManagementException.createInternalServerErrorManagementException(
+ "Unexpected result of performing operation
Session#getConsumers()");
}
return super.getChildren(category);
}
-
- private boolean sameSession(final LegacyConfiguredObject consumer,
- final UUID sessionID,
- final UUID connectionID)
- {
- LegacyConfiguredObject session = (LegacyConfiguredObject)
consumer.getAttribute("session");
- if (session != null)
- {
- if (sessionID.equals(session.getAttribute(ID)))
- {
- LegacyConfiguredObject con =
session.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION);
- return con != null &&
connectionID.equals(con.getAttribute(ID));
- }
- }
- return false;
- }
}
-}
\ No newline at end of file
+}
diff --git
a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
index 4c76ccf..d075217 100644
---
a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
+++
b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.when;
import java.util.Arrays;
diff --git
a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
index 77b5924..06cc42b 100644
---
a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
+++
b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,6 +36,9 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.management.plugin.ManagementController;
+import org.apache.qpid.server.management.plugin.ManagementResponse;
+import org.apache.qpid.server.management.plugin.ResponseType;
+import
org.apache.qpid.server.management.plugin.controller.ControllerManagementResponse;
import
org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject;
import
org.apache.qpid.server.management.plugin.controller.LegacyManagementController;
import org.apache.qpid.test.utils.UnitTestBase;
@@ -58,34 +62,26 @@ public class SessionControllerTest extends UnitTestBase
public void convertNextVersionLegacyConfiguredObject()
{
final UUID sessionID = UUID.randomUUID();
- final UUID connectionID = UUID.randomUUID();
final LegacyConfiguredObject nextVersionSession =
mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject nextVersionConnection =
mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject nextVersionVirtualHost =
mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject nextVersionQueue =
mock(LegacyConfiguredObject.class);
final LegacyConfiguredObject nextVersionConsumer =
mock(LegacyConfiguredObject.class);
when(nextVersionSession.getCategory()).thenReturn(SessionController.TYPE);
-
when(nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION)).thenReturn(nextVersionConnection);
when(nextVersionSession.getAttribute(LegacyConfiguredObject.ID)).thenReturn(sessionID);
-
when(nextVersionConnection.getParent(VirtualHostController.TYPE)).thenReturn(nextVersionVirtualHost);
-
when(nextVersionConnection.getAttribute(LegacyConfiguredObject.ID)).thenReturn(connectionID);
-
-
when(nextVersionVirtualHost.getChildren(QueueController.TYPE)).thenReturn(Collections.singletonList(nextVersionQueue));
-
when(nextVersionQueue.getChildren(ConsumerController.TYPE)).thenReturn(Collections.singletonList(nextVersionConsumer));
-
when(nextVersionConsumer.getAttribute("session")).thenReturn(nextVersionSession);
+ final ManagementResponse operationResult = new
ControllerManagementResponse(ResponseType.MODEL_OBJECT,
+
Collections.singletonList(
+
nextVersionConsumer));
+ when(nextVersionSession.invoke(eq("getConsumers"),
eq(Collections.emptyMap()), eq(true))).thenReturn(
+ operationResult);
final LegacyConfiguredObject convertedConsumer =
mock(LegacyConfiguredObject.class);
- final LegacyConfiguredObject convertedConnection =
mock(LegacyConfiguredObject.class);
when(_legacyManagementController.convertFromNextVersion(nextVersionConsumer)).thenReturn(convertedConsumer);
-
when(_legacyManagementController.convertFromNextVersion(nextVersionConnection)).thenReturn(convertedConnection);
- final LegacyConfiguredObject convertedSession =
_sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession);
+ final LegacyConfiguredObject convertedSession =
+
_sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession);
assertThat(convertedSession.getAttribute(LegacyConfiguredObject.ID),
is(equalTo(sessionID)));
-
assertThat(convertedSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION),
is(equalTo(convertedConnection)));
final Collection<LegacyConfiguredObject> consumers =
convertedSession.getChildren(ConsumerController.TYPE);
assertThat(consumers, is(notNullValue()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]