This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit af83b6db0463d095f626592e15a9e4f5297281f7 Author: Alex Rudyy <[email protected]> AuthorDate: Thu Aug 22 13:01:42 2019 +0100 QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin --- .../tests/protocol/v1_0/ExistingQueueAdmin.java | 99 +++++++++------------- 1 file changed, 41 insertions(+), 58 deletions(-) diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java index 313c4ff..d789b1a 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java @@ -31,8 +31,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; -import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; -import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; @@ -48,6 +46,7 @@ public class ExistingQueueAdmin implements QueueAdmin { private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class); private static final String ADMIN_LINK_NAME = "existingQueueAdminLink"; + private static final int DRAIN_CREDITS = 1000; @Override public void createQueue(final BrokerAdmin brokerAdmin, final String queueName) @@ -58,14 +57,7 @@ public class ExistingQueueAdmin implements QueueAdmin @Override public void deleteQueue(final BrokerAdmin brokerAdmin, final String queueName) { - try - { - drainQueue(brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP), queueName); - } - catch (Exception e) - { - throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e); - } + drain(queueName, brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP)); } @Override @@ -96,6 +88,18 @@ public class ExistingQueueAdmin implements QueueAdmin return true; } + private void drain(final String queueName, final InetSocketAddress brokerAddress) + { + try + { + drainQueue(brokerAddress, queueName); + } + catch (Exception e) + { + throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e); + } + } + private void putMessageOnQueue(final InetSocketAddress brokerAddress, final String queueName, final String... message) throws Exception @@ -133,10 +137,9 @@ public class ExistingQueueAdmin implements QueueAdmin { interaction.detachClose(true) .detach() - .consumeResponse(Detach.class) .end() - .consumeResponse(End.class) - .doCloseConnection(); + .close() + .sync(); } @@ -152,73 +155,53 @@ public class ExistingQueueAdmin implements QueueAdmin .attachRole(Role.RECEIVER) .attachSndSettleMode(SenderSettleMode.SETTLED) .attachSourceAddress(queueName) - .attach().consumeResponse(); - + .attach().consumeResponse(Attach.class) + .flowIncomingWindow(UnsignedInteger.MAX_VALUE) + .flowNextIncomingId(interaction.getCachedResponse(Begin.class).getNextOutgoingId()) + .flowLinkCredit(UnsignedInteger.valueOf(DRAIN_CREDITS)) + .flowHandleFromLinkHandle() + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowDrain(Boolean.TRUE) + .flow(); boolean received; - final Begin begin = interaction.getCachedResponse(Begin.class); - int nextIncomingId = begin.getNextOutgoingId().intValue(); do { - received = receive(interaction, queueName, nextIncomingId); - nextIncomingId++; + received = receive(interaction, queueName); } while (received); closeInteraction(interaction); } } - private boolean receive(final Interaction interaction, String queueName, int nextIncomingId) throws Exception + private boolean receive(final Interaction interaction, String queueName) throws Exception { - interaction.flowIncomingWindow(UnsignedInteger.MAX_VALUE) - .flowNextIncomingId(UnsignedInteger.valueOf(nextIncomingId)) - .flowLinkCredit(UnsignedInteger.ONE) - .flowDrain(Boolean.TRUE) - .flowHandleFromLinkHandle() - .flowOutgoingWindow(UnsignedInteger.ZERO) - .flowNextOutgoingId(UnsignedInteger.ZERO) - .flow(); - + boolean transferExpected; boolean messageReceived = false; - boolean flowReceived = false; do { - Response<?> latestResponse; - try - { - latestResponse = interaction.consumeResponse(Transfer.class, Flow.class).getLatestResponse(); - } - catch (IllegalStateException e) - { - if (messageReceived) - { - LOGGER.debug( - "Message was received on draining queue '{}' but flow was not. Assuming successful receive...", - queueName, - e); - } - else - { - LOGGER.warn( - "Neither message no flow was received on draining queue '{}'. Assuming no messages on the queue...", - queueName, - e); - } - return messageReceived; - } - if (latestResponse.getBody() instanceof Transfer) + final Response<?> latestResponse = + interaction.consumeResponse(Transfer.class, Flow.class, null).getLatestResponse(); + if (latestResponse != null && latestResponse.getBody() instanceof Transfer) { Transfer responseTransfer = (Transfer) latestResponse.getBody(); - if (!Boolean.TRUE.equals(responseTransfer.getMore())) + transferExpected = Boolean.TRUE.equals(responseTransfer.getMore()); + if (!transferExpected) { messageReceived = true; } } - else if (latestResponse.getBody() instanceof Flow) + else if (latestResponse != null && latestResponse.getBody() instanceof Flow) + { + transferExpected = false; + } + else { - flowReceived = true; + LOGGER.warn("Neither transfer no flow was received from '{}'. Assuming no messages left...", queueName); + transferExpected = false; } } - while (!flowReceived); + while (transferExpected); return messageReceived; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
