Repository: qpid-broker-j Updated Branches: refs/heads/master b36c7180f -> 82ef9deaa
QPID-7648: [Java Broker] Reject AMQP 1.0 durable messages if no persistent store is configured. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/82ef9dea Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/82ef9dea Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/82ef9dea Branch: refs/heads/master Commit: 82ef9deaa97508998fc0db3e56969242778de4a3 Parents: b36c718 Author: Lorenz Quack <[email protected]> Authored: Mon Jun 12 12:10:43 2017 +0100 Committer: Lorenz Quack <[email protected]> Committed: Mon Jun 12 12:10:43 2017 +0100 ---------------------------------------------------------------------- .../qpid/server/store/MemoryMessageStore.java | 2 +- .../v1_0/StandardReceivingLinkEndpoint.java | 25 +++- pom.xml | 7 +- .../tests/protocol/v1_0/MessageEncoder.java | 20 +-- .../protocol/v1_0/messaging/TransferTest.java | 142 +++++++++++++++++++ 5 files changed, 184 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 0da7db7..d9992ec 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -321,7 +321,7 @@ public class MemoryMessageStore implements MessageStore @Override public boolean isPersistent() { - return false; + return Boolean.parseBoolean(System.getProperty("qpid.tests.mms.messagestore.persistence", "false")); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java index 331db41..e33bbb6 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java @@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; import org.apache.qpid.server.protocol.v1_0.type.Outcome; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; @@ -237,9 +238,29 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint session.getAMQPConnection() .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId()); - Outcome outcome = getReceivingDestination().send(serverMessage, transaction, - session.getSecurityToken()); + Outcome outcome; Source source = getSource(); + if (serverMessage.isPersistent() && !getAddressSpace().getMessageStore().isPersistent()) + { + final Error preconditionFailedError = new Error(AmqpError.PRECONDITION_FAILED, + "Non-durable message store cannot accept durable message."); + if (source.getOutcomes() != null && Arrays.asList(source.getOutcomes()) + .contains(Rejected.REJECTED_SYMBOL)) + { + final Rejected rejected = new Rejected(); + rejected.setError(preconditionFailedError); + outcome = rejected; + } + else + { + return preconditionFailedError; + } + } + else + { + outcome = getReceivingDestination().send(serverMessage, transaction, + session.getSecurityToken()); + } DeliveryState resultantState; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 45f6fa1..612ba6d 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ <profile.test_receive_timeout>1000</profile.test_receive_timeout> <profile.java.naming.factory.initial>org.apache.qpid.jndi.PropertiesFileInitialContextFactory</profile.java.naming.factory.initial> <profile.java.naming.provider.url>test-profiles${file.separator}test-provider.properties</profile.java.naming.provider.url> + <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence> <dollar.sign>$</dollar.sign> <at.sign>@</at.sign> @@ -531,6 +532,7 @@ <test.output.dir>${test.output.dir}</test.output.dir> <broker.clean.between.tests>true</broker.clean.between.tests> <qpid.test_receive_timeout>${profile.test_receive_timeout}</qpid.test_receive_timeout> + <qpid.tests.mms.messagestore.persistence>${profile.qpid.tests.mms.messagestore.persistence}</qpid.tests.mms.messagestore.persistence> <java.naming.factory.initial>${profile.java.naming.factory.initial}</java.naming.factory.initial> <java.naming.provider.url>${profile.java.naming.provider.url}</java.naming.provider.url> </systemPropertyVariables> @@ -732,6 +734,7 @@ <profile.broker.persistent>false</profile.broker.persistent> <profile.virtualhostnode.type>Memory</profile.virtualhostnode.type> <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint> + <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence> </properties> </profile> @@ -751,6 +754,7 @@ <profile.broker.persistent>false</profile.broker.persistent> <profile.virtualhostnode.type>Memory</profile.virtualhostnode.type> <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint> + <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence> </properties> </profile> @@ -770,6 +774,7 @@ <profile.broker.persistent>false</profile.broker.persistent> <profile.virtualhostnode.type>Memory</profile.virtualhostnode.type> <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint> + <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence> </properties> </profile> @@ -965,7 +970,7 @@ <profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint> <profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial> <profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url> - + <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence> </properties> </profile> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java index 78ffe9d..0c77ab1 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java @@ -25,29 +25,33 @@ import java.util.LinkedList; import java.util.List; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; -import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; public class MessageEncoder { - private static final AMQPDescribedTypeRegistry - AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer(); + private Header _header; private List<String> _data = new LinkedList<>(); - private SectionEncoder _encoder = new SectionEncoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY); public void addData(final String data) { _data.add(data); } + public void setHeader(Header header) + { + _header = header; + } + public List<QpidByteBuffer> getPayload() { List<QpidByteBuffer> payload = new ArrayList<>(); + if (_header != null) + { + payload.addAll(_header.createEncodingRetainingSection().getEncodedForm()); + } + if (_data.isEmpty()) { throw new IllegalStateException("Message should have at least one data section"); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index c099e20..8d93002 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -32,15 +32,19 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.ListenableFuture; +import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; @@ -66,14 +70,31 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; public class TransferTest extends ProtocolTestBase { private InetSocketAddress _brokerAddress; + private String _originalMmsMessageStorePersistence; @Before public void setUp() { + _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence"); + System.setProperty("qpid.tests.mms.messagestore.persistence", "false"); + getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME); _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); } + @After + public void tearDown() + { + if (_originalMmsMessageStorePersistence != null) + { + System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence); + } + else + { + System.clearProperty("qpid.tests.mms.messagestore.persistence"); + } + } + @Test @SpecificationTest(section = "1.3.4", description = "Transfer without mandatory fields should result in a decoding error.") @@ -278,6 +299,8 @@ public class TransferTest extends ProtocolTestBase MessageEncoder messageEncoder = new MessageEncoder(); messageEncoder.addData("foo"); Transfer transfer = new Transfer(); + transfer.setDeliveryId(UnsignedInteger.ONE); + transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8))); transfer.setHandle(linkHandle); transfer.setPayload(messageEncoder.getPayload()); transfer.setSettled(Boolean.TRUE); @@ -324,4 +347,123 @@ public class TransferTest extends ProtocolTestBase assertThat(closeResponse.getFrameBody(), is(instanceOf(Close.class))); } } + + @Test + @SpecificationTest(section = "3.2.1", + description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and " + + "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages " + + "where the durable header is set to true: if the source allows the rejected outcome then the " + + "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be " + + "detached by the receiver with the same error.") + public void durableTransferWithRejectedOutcome() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress)) + { + final Attach attach = new Attach(); + attach.setName("testLink"); + attach.setRole(Role.SENDER); + final UnsignedInteger linkHandle = UnsignedInteger.ZERO; + attach.setHandle(linkHandle); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + Target target = new Target(); + target.setAddress(BrokerAdmin.TEST_QUEUE_NAME); + attach.setTarget(target); + final Source source = new Source(); + source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL); + attach.setSource(source); + transport.doAttachSendingLink(attach); + + MessageEncoder messageEncoder = new MessageEncoder(); + final Header header = new Header(); + header.setDurable(true); + messageEncoder.setHeader(header); + messageEncoder.addData("test message data."); + Transfer transfer = new Transfer(); + transfer.setDeliveryId(UnsignedInteger.ONE); + transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8))); + transfer.setHandle(linkHandle); + transfer.setPayload(messageEncoder.getPayload()); + + transport.sendPerformative(transfer); + PerformativeResponse response = (PerformativeResponse) transport.getNextResponse(); + + if (getBrokerAdmin().supportsRestart()) + { + assertThat(response, is(notNullValue())); + assertThat(response.getFrameBody(), is(instanceOf(Disposition.class))); + final Disposition receivedDisposition = (Disposition) response.getFrameBody(); + assertThat(receivedDisposition.getSettled(), is(true)); + assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class))); + assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL)); + } + else + { + assertThat(response, is(notNullValue())); + assertThat(response.getFrameBody(), is(instanceOf(Disposition.class))); + final Disposition receivedDisposition = (Disposition) response.getFrameBody(); + assertThat(receivedDisposition.getSettled(), is(true)); + assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class))); + assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Rejected.REJECTED_SYMBOL)); + } + } + } + + @Test + @SpecificationTest(section = "3.2.1", + description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and " + + "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages " + + "where the durable header is set to true: if the source allows the rejected outcome then the " + + "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be " + + "detached by the receiver with the same error.") + public void durableTransferWithoutRejectedOutcome() throws Exception + { + try (FrameTransport transport = new FrameTransport(_brokerAddress)) + { + final Attach attach = new Attach(); + attach.setName("testLink"); + attach.setRole(Role.SENDER); + final UnsignedInteger linkHandle = UnsignedInteger.ZERO; + attach.setHandle(linkHandle); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + Target target = new Target(); + target.setAddress(BrokerAdmin.TEST_QUEUE_NAME); + attach.setTarget(target); + final Source source = new Source(); + source.setOutcomes(Accepted.ACCEPTED_SYMBOL); + attach.setSource(source); + transport.doAttachSendingLink(attach); + + MessageEncoder messageEncoder = new MessageEncoder(); + final Header header = new Header(); + header.setDurable(true); + messageEncoder.setHeader(header); + messageEncoder.addData("test message data."); + Transfer transfer = new Transfer(); + transfer.setDeliveryId(UnsignedInteger.ONE); + transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8))); + transfer.setHandle(linkHandle); + transfer.setPayload(messageEncoder.getPayload()); + + transport.sendPerformative(transfer); + PerformativeResponse response = (PerformativeResponse) transport.getNextResponse(); + + if (getBrokerAdmin().supportsRestart()) + { + assertThat(response, is(notNullValue())); + assertThat(response.getFrameBody(), is(instanceOf(Disposition.class))); + final Disposition receivedDisposition = (Disposition) response.getFrameBody(); + assertThat(receivedDisposition.getSettled(), is(true)); + assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class))); + assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL)); + } + else + { + assertThat(response, is(notNullValue())); + assertThat(response.getFrameBody(), is(instanceOf(Detach.class))); + final Detach receivedDetach = (Detach) response.getFrameBody(); + assertThat(receivedDetach.getError(), is(notNullValue())); + assertThat(receivedDetach.getError().getCondition(), is(AmqpError.PRECONDITION_FAILED)); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
