Repository: qpid-broker-j Updated Branches: refs/heads/master 498ba2cfb -> 0598f7ebc
QPID-7842: Disallow sending of a transfer with not unique delivery tag amongst all deliveries that could be considered unsettled by either end of the link Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0598f7eb Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0598f7eb Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0598f7eb Branch: refs/heads/master Commit: 0598f7ebc982a2c534be960a8c7c06ab4bfa1002 Parents: 498ba2c Author: Alex Rudyy <[email protected]> Authored: Thu Jun 29 13:37:31 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Thu Jun 29 13:37:31 2017 +0100 ---------------------------------------------------------------------- .../v1_0/AbstractReceivingLinkEndpoint.java | 8 +++ .../v1_0/type/ErrorCarryingFrameBody.java | 28 +++++++++ .../protocol/v1_0/type/transport/Close.java | 4 +- .../protocol/v1_0/type/transport/Detach.java | 4 +- .../protocol/v1_0/type/transport/End.java | 4 +- .../protocol/v1_0/messaging/TransferTest.java | 66 +++++++++++++++++++- 6 files changed, 107 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java index d159de5..e87fffb 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java @@ -176,6 +176,14 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend error = new Error(AmqpError.INVALID_FIELD, "Transfer \"delivery-tag\" is required for a new delivery."); } + else if (_unsettled.containsKey(transfer.getDeliveryTag())) + { + error = new Error(AmqpError.ILLEGAL_STATE, + String.format("Delivery-tag '%s' is used by another unsettled delivery." + + " The delivery-tag MUST be unique amongst all deliveries that" + + " could be considered unsettled by either end of the link.", + transfer.getDeliveryTag())); + } return error; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java new file mode 100644 index 0000000..79649b3 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0.type; + +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; + +public interface ErrorCarryingFrameBody extends FrameBody +{ + Error getError(); +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java index 55f700c..5fcee18 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java @@ -25,9 +25,9 @@ package org.apache.qpid.server.protocol.v1_0.type.transport; import org.apache.qpid.server.protocol.v1_0.ConnectionHandler; import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField; -import org.apache.qpid.server.protocol.v1_0.type.FrameBody; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; -public class Close implements FrameBody +public class Close implements ErrorCarryingFrameBody { @CompositeTypeField http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java index 02fb1d8..bccc719 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java @@ -25,10 +25,10 @@ package org.apache.qpid.server.protocol.v1_0.type.transport; import org.apache.qpid.server.protocol.v1_0.ConnectionHandler; import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField; -import org.apache.qpid.server.protocol.v1_0.type.FrameBody; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; -public class Detach implements FrameBody +public class Detach implements ErrorCarryingFrameBody { @CompositeTypeField(mandatory = true) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java index ca8b375..6c60f1b 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java @@ -25,9 +25,9 @@ package org.apache.qpid.server.protocol.v1_0.type.transport; import org.apache.qpid.server.protocol.v1_0.ConnectionHandler; import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField; -import org.apache.qpid.server.protocol.v1_0.type.FrameBody; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; -public class End implements FrameBody +public class End implements ErrorCarryingFrameBody { @CompositeTypeField http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 6ff5d5f..0d375e4 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -25,7 +25,10 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.fail; import static org.junit.Assume.assumeThat; import java.net.InetSocketAddress; @@ -39,6 +42,8 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody; import org.apache.qpid.server.protocol.v1_0.type.Outcome; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; @@ -70,7 +75,7 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; public class TransferTest extends ProtocolTestBase { - public static final String TEST_MESSAGE_DATA = "foo"; + private static final String TEST_MESSAGE_DATA = "foo"; private InetSocketAddress _brokerAddress; private String _originalMmsMessageStorePersistence; @@ -650,4 +655,63 @@ public class TransferTest extends ProtocolTestBase } } + + @Test + @SpecificationTest(section = "2.7.5", + description = "[delivery-tag] uniquely identifies the delivery attempt for a given message on this link.") + public void transfersWithDuplicateDeliveryTag() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8)); + + Interaction interaction = transport.newInteraction(); + interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachRcvSettleMode(ReceiverSettleMode.SECOND) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class); + + Flow flow = interaction.getLatestResponse(Flow.class); + assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1))); + + interaction.transferDeliveryId(UnsignedInteger.ZERO) + .transferDeliveryTag(deliveryTag) + .transferPayloadData("test") + .transfer() + + .transferDeliveryTag(deliveryTag) + .transferDeliveryId(UnsignedInteger.ONE) + .transferPayloadData("test2") + .transfer(); + + do + { + interaction.consumeResponse(); + Response<?> response = interaction.getLatestResponse(); + assertThat(response, is(notNullValue())); + + Object body = response.getBody(); + if (body instanceof ErrorCarryingFrameBody) + { + Error error = ((ErrorCarryingFrameBody) body).getError(); + assertThat(error, is(notNullValue())); + break; + } + else if (body instanceof Disposition) + { + Disposition disposition = (Disposition)body; + assertThat(disposition.getSettled(), is(equalTo(false))); + assertThat(disposition.getFirst(), is(not(equalTo(UnsignedInteger.ONE)))); + assertThat(disposition.getLast(), is(not(equalTo(UnsignedInteger.ONE)))); + } + else if (!(body instanceof Flow)) + { + fail("Unexpected response " + body); + } + } while (true); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
