This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push: new 00f976b QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers 00f976b is described below commit 00f976b1fcc6db515cd3c0a0e4b22bc3a34edfec Author: Alex Rudyy <oru...@apache.org> AuthorDate: Sun Dec 16 22:39:35 2018 +0000 QPID-6948: [Broker-J] Fix relationship between model v6.1 session and consumers --- .../java/org/apache/qpid/server/model/Session.java | 7 +++ .../apache/qpid/server/queue/AbstractQueue.java | 4 +- .../apache/qpid/server/session/AMQPSession.java | 5 +- .../qpid/server/session/AbstractAMQPSession.java | 16 +++++- .../v6_1/category/SessionController.java | 60 +++++++--------------- .../v6_1/category/DestinationControllerTest.java | 1 - .../v6_1/category/SessionControllerTest.java | 26 ++++------ 7 files changed, 56 insertions(+), 63 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java index ccb396a..af1084e 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Session.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Session.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.model; +import java.util.Set; + @ManagedObject( creatable = false, amqpName = "org.apache.qpid.Session") public interface Session<X extends Session<X>> extends ConfiguredObject<X> { @@ -58,4 +60,9 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X> @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetched") long getUnacknowledgedMessages(); + + @ManagedOperation(nonModifying = true, + changesConfiguredObjectState = false, + skipAclCheck = true) + Set<? extends Consumer<?, ?>> getConsumers(); } diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index ba98f1a..84a4700 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -1050,7 +1050,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> childAdded(consumer); consumer.addChangeListener(_deletedChildListener); - session.incConsumerCount(); + session.consumerAdded(consumer); addChangeListener(new AbstractConfigurationChangeListener() { @Override @@ -1058,7 +1058,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { if (child.equals(consumer)) { - session.decConsumerCount(); + session.consumerRemoved(consumer); removeChangeListener(this); } } diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java index d68592b..4603e22 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java +++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.EventLoggerProvider; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.transport.AMQPConnection; @@ -60,9 +61,9 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio @Override long getConsumerCount(); - void incConsumerCount(); + void consumerAdded(Consumer<?, X> consumer); - void decConsumerCount(); + void consumerRemoved(Consumer<?, X> consumer); /** * Return the time the current transaction started. diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java index db69dd9..7256444 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java +++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java @@ -21,11 +21,13 @@ package org.apache.qpid.server.session; import java.security.AccessControlContext; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -47,6 +49,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; @@ -77,6 +80,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>(); private Iterator<AbstractConsumerTarget> _processPendingIterator; + private final Set<Consumer<?,X>> _consumers = ConcurrentHashMap.newKeySet(); protected AbstractAMQPSession(final Connection<?> parent, final int sessionId) { @@ -256,15 +260,23 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, } @Override - public final void incConsumerCount() + public final void consumerAdded(Consumer<?, X> consumer) { _consumerCount.incrementAndGet(); + _consumers.add(consumer); } @Override - public final void decConsumerCount() + public final void consumerRemoved(Consumer<?, X> consumer) { _consumerCount.decrementAndGet(); + _consumers.remove(consumer); + } + + @Override + public Set<? extends Consumer<?, ?>> getConsumers() + { + return Collections.unmodifiableSet(_consumers); } protected abstract void updateBlockedStateIfNecessary(); diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java index 5177b1e..a279e7c 100644 --- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java +++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionController.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.server.management.plugin.controller.v6_1.category; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.Collections; import java.util.Set; -import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.qpid.server.management.plugin.ManagementException; +import org.apache.qpid.server.management.plugin.ManagementResponse; +import org.apache.qpid.server.management.plugin.ResponseType; import org.apache.qpid.server.management.plugin.controller.GenericLegacyConfiguredObject; import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject; import org.apache.qpid.server.management.plugin.controller.LegacyManagementController; @@ -60,52 +62,28 @@ public class SessionController extends LegacyCategoryController } @Override - @SuppressWarnings("unchecked") public Collection<LegacyConfiguredObject> getChildren(final String category) { if (ConsumerController.TYPE.equalsIgnoreCase(category)) { final LegacyConfiguredObject nextVersionSession = getNextVersionLegacyConfiguredObject(); - final LegacyConfiguredObject connection = - nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION); - final LegacyConfiguredObject vh = connection.getParent(VirtualHostController.TYPE); - final UUID sessionID = (UUID) getAttribute(ID); - final UUID connectionID = (UUID) connection.getAttribute(ID); - final List<LegacyConfiguredObject> consumers = new ArrayList<>(); - final Collection<LegacyConfiguredObject> queues = vh.getChildren(QueueController.TYPE); - if (queues != null) + final ManagementResponse result = + nextVersionSession.invoke("getConsumers", Collections.emptyMap(), true); + if (result != null && result.getResponseCode() == 200 && result.getType() == ResponseType.MODEL_OBJECT) { - queues.forEach(q -> { - final Collection<LegacyConfiguredObject> queueConsumers = - q.getChildren(ConsumerController.TYPE); - if (queueConsumers != null) - { - queueConsumers.stream() - .filter(c -> sameSession(c, sessionID, connectionID)) - .map(c -> getManagementController().convertFromNextVersion(c)) - .forEach(consumers::add); - } - }); + final Object objects = result.getBody(); + if (objects instanceof Collection) + { + return ((Collection<?>) objects).stream().filter(o -> o instanceof LegacyConfiguredObject) + .map(o -> (LegacyConfiguredObject)o) + .map(o -> getManagementController().convertFromNextVersion(o)) + .collect(Collectors.toList()); + } } - return consumers; + throw ManagementException.createInternalServerErrorManagementException( + "Unexpected result of performing operation Session#getConsumers()"); } return super.getChildren(category); } - - private boolean sameSession(final LegacyConfiguredObject consumer, - final UUID sessionID, - final UUID connectionID) - { - LegacyConfiguredObject session = (LegacyConfiguredObject) consumer.getAttribute("session"); - if (session != null) - { - if (sessionID.equals(session.getAttribute(ID))) - { - LegacyConfiguredObject con = session.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION); - return con != null && connectionID.equals(con.getAttribute(ID)); - } - } - return false; - } } -} \ No newline at end of file +} diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java index 4c76ccf..d075217 100644 --- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java +++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/DestinationControllerTest.java @@ -27,7 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockingDetails; import static org.mockito.Mockito.when; import java.util.Arrays; diff --git a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java index 77b5924..06cc42b 100644 --- a/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java +++ b/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/controller/v6_1/category/SessionControllerTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,6 +36,9 @@ import org.junit.Before; import org.junit.Test; import org.apache.qpid.server.management.plugin.ManagementController; +import org.apache.qpid.server.management.plugin.ManagementResponse; +import org.apache.qpid.server.management.plugin.ResponseType; +import org.apache.qpid.server.management.plugin.controller.ControllerManagementResponse; import org.apache.qpid.server.management.plugin.controller.LegacyConfiguredObject; import org.apache.qpid.server.management.plugin.controller.LegacyManagementController; import org.apache.qpid.test.utils.UnitTestBase; @@ -58,34 +62,26 @@ public class SessionControllerTest extends UnitTestBase public void convertNextVersionLegacyConfiguredObject() { final UUID sessionID = UUID.randomUUID(); - final UUID connectionID = UUID.randomUUID(); final LegacyConfiguredObject nextVersionSession = mock(LegacyConfiguredObject.class); - final LegacyConfiguredObject nextVersionConnection = mock(LegacyConfiguredObject.class); - final LegacyConfiguredObject nextVersionVirtualHost = mock(LegacyConfiguredObject.class); - final LegacyConfiguredObject nextVersionQueue = mock(LegacyConfiguredObject.class); final LegacyConfiguredObject nextVersionConsumer = mock(LegacyConfiguredObject.class); when(nextVersionSession.getCategory()).thenReturn(SessionController.TYPE); - when(nextVersionSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION)).thenReturn(nextVersionConnection); when(nextVersionSession.getAttribute(LegacyConfiguredObject.ID)).thenReturn(sessionID); - when(nextVersionConnection.getParent(VirtualHostController.TYPE)).thenReturn(nextVersionVirtualHost); - when(nextVersionConnection.getAttribute(LegacyConfiguredObject.ID)).thenReturn(connectionID); - - when(nextVersionVirtualHost.getChildren(QueueController.TYPE)).thenReturn(Collections.singletonList(nextVersionQueue)); - when(nextVersionQueue.getChildren(ConsumerController.TYPE)).thenReturn(Collections.singletonList(nextVersionConsumer)); - when(nextVersionConsumer.getAttribute("session")).thenReturn(nextVersionSession); + final ManagementResponse operationResult = new ControllerManagementResponse(ResponseType.MODEL_OBJECT, + Collections.singletonList( + nextVersionConsumer)); + when(nextVersionSession.invoke(eq("getConsumers"), eq(Collections.emptyMap()), eq(true))).thenReturn( + operationResult); final LegacyConfiguredObject convertedConsumer = mock(LegacyConfiguredObject.class); - final LegacyConfiguredObject convertedConnection = mock(LegacyConfiguredObject.class); when(_legacyManagementController.convertFromNextVersion(nextVersionConsumer)).thenReturn(convertedConsumer); - when(_legacyManagementController.convertFromNextVersion(nextVersionConnection)).thenReturn(convertedConnection); - final LegacyConfiguredObject convertedSession = _sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession); + final LegacyConfiguredObject convertedSession = + _sessionController.convertNextVersionLegacyConfiguredObject(nextVersionSession); assertThat(convertedSession.getAttribute(LegacyConfiguredObject.ID), is(equalTo(sessionID))); - assertThat(convertedSession.getParent(LegacyCategoryControllerFactory.CATEGORY_CONNECTION), is(equalTo(convertedConnection))); final Collection<LegacyConfiguredObject> consumers = convertedSession.getChildren(ConsumerController.TYPE); assertThat(consumers, is(notNullValue())); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org