Repository: qpid-broker-j Updated Branches: refs/heads/master 377315ba5 -> 97ebcc8ef
QPID-8058: [Broker-J][AMQP 1.0] Fix draining of temporary message sources on management node Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/97ebcc8e Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/97ebcc8e Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/97ebcc8e Branch: refs/heads/master Commit: 97ebcc8ef5f0a84a42af5d2eede374d27f2e61c6 Parents: 377315b Author: Alex Rudyy <[email protected]> Authored: Fri Dec 8 15:25:17 2017 +0000 Committer: Alex Rudyy <[email protected]> Committed: Fri Dec 8 15:25:17 2017 +0000 ---------------------------------------------------------------------- .../management/amqp/ManagementNodeConsumer.java | 4 + systests/protocol-tests-amqp-1-0/pom.xml | 5 +- .../extensions/management/ManagementTest.java | 131 +++++++++++++++++++ .../apache/qpid/tests/utils/BrokerAdmin.java | 1 + .../utils/EmbeddedBrokerPerClassAdminImpl.java | 6 + .../utils/ExternalQpidBrokerAdminImpl.java | 6 + 6 files changed, 152 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java ---------------------------------------------------------------------- diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 7a272fa..0662ea5 100644 --- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -93,6 +93,10 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc return new MessageContainer(managementResponse, managementResponse.getMessageReference()); } } + else + { + _target.noMessagesAvailable(); + } return null; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/protocol-tests-amqp-1-0/pom.xml ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml index aa25351..ee5bd9f 100644 --- a/systests/protocol-tests-amqp-1-0/pom.xml +++ b/systests/protocol-tests-amqp-1-0/pom.xml @@ -45,7 +45,10 @@ <groupId>org.apache.qpid</groupId> <artifactId>qpid-broker-codegen</artifactId> </dependency> - + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-plugins-management-amqp</artifactId> + </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-test-utils</artifactId> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java new file mode 100644 index 0000000..0025fb5 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tests.protocol.v1_0.extensions.management; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assume.assumeThat; + +import java.net.InetSocketAddress; +import java.util.Collections; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.protocol.v1_0.Session_1_0; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.tests.protocol.SpecificationTest; +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; +import org.apache.qpid.tests.protocol.v1_0.Interaction; +import org.apache.qpid.tests.protocol.v1_0.Utils; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; + +public class ManagementTest extends BrokerAdminUsingTestBase +{ + private InetSocketAddress _brokerAddress; + + @Before + public void setUp() + { + _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + } + + @Test + @SpecificationTest(section = "2.6.7", + description = "The drain flag indicates how the sender SHOULD behave when insufficient messages" + + " are available to consume the current link-credit. If set, the sender will" + + " (after sending all available messages) advance the delivery-count as much as possible," + + " consuming all link-credit, and send the flow state to the receiver.") + public void drainTemporaryMessageSource() throws Exception + { + assumeThat(getBrokerAdmin().isManagementSupported(), is(equalTo(true))); + + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + Target target = new Target(); + target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose())); + target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + target.setDynamic(true); + target.setCapabilities(new Symbol[]{Symbol.valueOf("temporary-queue")}); + + final Interaction interaction = transport.newInteraction(); + final Attach attachResponse = interaction.negotiateProtocol().consumeResponse() + .openHostname("$management") + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachTarget(target) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + + assertThat(attachResponse.getSource(), is(notNullValue())); + assertThat(attachResponse.getTarget(), is(notNullValue())); + + String newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress(); + assertThat(newTemporaryNodeAddress, is(notNullValue())); + + interaction.consumeResponse().getLatestResponse(Flow.class); + + final Attach receiverResponse = interaction.attachHandle(UnsignedInteger.ONE).attachRole(Role.RECEIVER) + .attachSourceAddress(newTemporaryNodeAddress) + .attachRcvSettleMode(ReceiverSettleMode.FIRST) + .attach().consumeResponse().getLatestResponse(Attach.class); + + assertThat(receiverResponse.getSource(), is(instanceOf(Source.class))); + assertThat(((Source)receiverResponse.getSource()).getAddress(), is(equalTo(newTemporaryNodeAddress))); + + // 2.6.8 Synchronous Get + + // grant credit of 1 + interaction.flowIncomingWindow(UnsignedInteger.ONE) + .flowNextIncomingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.ONE) + .flowHandleFromLinkHandle() + .flow(); + + // send drain to ensure the sender promptly advances the delivery-count until link-credit is consumed + interaction.flowDrain(true).flow(); + + Flow flow = interaction.consumeResponse().getLatestResponse(Flow.class); + assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO))); + assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle()))); + + interaction.doCloseConnection(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java index 6875460..3b57431 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java @@ -51,6 +51,7 @@ public interface BrokerAdmin extends Pluggable boolean isSASLMechanismSupported(String mechanismName); boolean isWebSocketSupported(); boolean isQueueDepthSupported(); + boolean isManagementSupported(); String getValidUsername(); String getValidPassword(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java index f171f4d..0bb57ce 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java @@ -364,6 +364,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin } @Override + public boolean isManagementSupported() + { + return true; + } + + @Override public String getValidUsername() { return "guest"; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/97ebcc8e/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java index e935067..f359053 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java @@ -132,6 +132,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin } @Override + public boolean isManagementSupported() + { + return false; + } + + @Override public boolean isSASLMechanismSupported(final String mechanismName) { return true; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
