This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new cf366744 PROTON-2802 Make scripting message payload of a transfer
simpler
cf366744 is described below
commit cf3667444bce4ff9da8c9097d7c7a2fa2f767386
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Mar 6 17:59:05 2024 -0500
PROTON-2802 Make scripting message payload of a transfer simpler
Provide a simpler API for scripting the expected message payload that
will accompany an incoming transfer and provide some updates to match
that API on the remote inject of transfer with message payloads.
---
.../qpid/protonj2/client/impl/MessageSendTest.java | 21 +-
.../test/driver/actions/TransferInjectAction.java | 52 +-
.../driver/expectations/TransferExpectation.java | 5 +
.../messaging/AbstractMessageSectionMatcher.java | 10 +-
.../matchers/transport/TransferMessageMatcher.java | 811 +++++++++++++++++++++
.../transport/TransferPayloadCompositeMatcher.java | 2 +-
.../matchers/types/EncodedAmqpTypeMatcher.java | 16 +-
.../protonj2/test/driver/SenderHandlingTest.java | 366 ++++++++++
8 files changed, 1264 insertions(+), 19 deletions(-)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
index 319597c3..3746ea08 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
@@ -95,19 +95,16 @@ class MessageSendTest extends ImperativeClientTestCase {
// Gates send on remote flow having been sent and received
session.openReceiver("dummy").openFuture().get();
- HeaderMatcher headerMatcher = new HeaderMatcher(true);
- headerMatcher.withDurable(true);
- headerMatcher.withPriority((byte) 1);
- headerMatcher.withTtl(65535);
- headerMatcher.withFirstAcquirer(true);
- headerMatcher.withDeliveryCount(2);
- EncodedAmqpValueMatcher bodyMatcher = new
EncodedAmqpValueMatcher("Hello World");
- TransferPayloadCompositeMatcher payloadMatcher = new
TransferPayloadCompositeMatcher();
- payloadMatcher.setHeadersMatcher(headerMatcher);
- payloadMatcher.setMessageContentMatcher(bodyMatcher);
-
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-
peer.expectTransfer().withMessageFormat(0).withPayload(payloadMatcher).accept();
+ peer.expectTransfer().withMessage().withMessageFormat(0)
+ .withHeader()
+ .withDurability(true)
+ .withPriority((byte) 1)
+ .withTimeToLive(65535)
+ .withFirstAcquirer(true)
+ .withDeliveryCount(2)
+ .also()
+ .withValue("Hello World");
peer.expectDetach().respond();
peer.expectClose().respond();
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
index dd2c6d01..e434a486 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
@@ -257,6 +257,10 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
return new FooterBuilder();
}
+ public MessageBuilder withMessage() {
+ return new MessageBuilder();
+ }
+
private Header getOrCreateHeader() {
if (header == null) {
header = new Header();
@@ -479,6 +483,11 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
public final class ApplicationPropertiesBuilder extends SectionBuilder {
+ public ApplicationPropertiesBuilder withProperty(String key, Object
value) {
+ getOrCreateApplicationProperties().setApplicationProperty(key,
value);
+ return this;
+ }
+
public ApplicationPropertiesBuilder withApplicationProperty(String
key, Object value) {
getOrCreateApplicationProperties().setApplicationProperty(key,
value);
return this;
@@ -540,7 +549,12 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
public final class FooterBuilder extends SectionBuilder {
- public FooterBuilder withFooter(Object key, Object value) {
+ public FooterBuilder withFooter(String key, Object value) {
+ getOrCreateFooter().setFooterProperty(Symbol.valueOf(key), value);
+ return this;
+ }
+
+ public FooterBuilder withFooter(Symbol key, Object value) {
getOrCreateFooter().setFooterProperty(key, value);
return this;
}
@@ -666,6 +680,42 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
}
}
+ public final class MessageBuilder extends SectionBuilder {
+
+ public MessageBuilder withMessageFormat(int format) {
+ TransferInjectAction.this.withMessageFormat(format);
+ return this;
+ }
+
+ public HeaderBuilder withHeader() {
+ return TransferInjectAction.this.withHeader();
+ }
+
+ public DeliveryAnnotationsBuilder withDeliveryAnnotations() {
+ return TransferInjectAction.this.withDeliveryAnnotations();
+ }
+
+ public MessageAnnotationsBuilder withMessageAnnotations() {
+ return TransferInjectAction.this.withMessageAnnotations();
+ }
+
+ public PropertiesBuilder withProperties() {
+ return TransferInjectAction.this.withProperties();
+ }
+
+ public ApplicationPropertiesBuilder withApplicationProperties() {
+ return TransferInjectAction.this.withApplicationProperties();
+ }
+
+ public BodySectionBuilder withBody() {
+ return TransferInjectAction.this.withBody();
+ }
+
+ public FooterBuilder withFooter() {
+ return TransferInjectAction.this.withFooter();
+ }
+ }
+
private static byte[] generateUniqueDeliveryTag() {
final byte[] tag = new byte[Long.BYTES + Long.BYTES];
final UUID uuid = UUID.randomUUID();
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
index 6476a2ac..0a0398f0 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
@@ -44,6 +44,7 @@ import
org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
import
org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMatcher;
+import
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMessageMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@@ -285,6 +286,10 @@ public class TransferExpectation extends
AbstractExpectation<Transfer> {
return this;
}
+ public TransferMessageMatcher withMessage() {
+ return (TransferMessageMatcher) (this.payloadMatcher = new
TransferMessageMatcher(this));
+ }
+
//----- Matcher based with methods for more complex validation
public TransferExpectation withHandle(Matcher<?> m) {
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
index ac174685..9fb76013 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
@@ -41,7 +41,7 @@ public abstract class AbstractMessageSectionMatcher<T extends
AbstractMessageSec
private final Map<Object, Matcher<?>> fieldMatchers;
private Map<Object, Object> receivedFields;
- private final boolean allowTrailingBytes;
+ private boolean allowTrailingBytes;
protected AbstractMessageSectionMatcher(UnsignedLong numericDescriptor,
Symbol symbolicDescriptor, Map<Object, Matcher<?>> fieldMatchers, boolean
expectTrailingBytes) {
this.numericDescriptor = numericDescriptor;
@@ -50,6 +50,14 @@ public abstract class AbstractMessageSectionMatcher<T
extends AbstractMessageSec
this.allowTrailingBytes = expectTrailingBytes;
}
+ public void setAllowTrailingBytes(boolean allowTrailingBytes) {
+ this.allowTrailingBytes = allowTrailingBytes;
+ }
+
+ public boolean isAllowTrailngBytes() {
+ return allowTrailingBytes;
+ }
+
protected Map<Object, Matcher<?>> getMatchers() {
return fieldMatchers;
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java
new file mode 100644
index 00000000..25cb5a80
--- /dev/null
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver.matchers.transport;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
+import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
+import org.apache.qpid.protonj2.test.driver.expectations.TransferExpectation;
+import
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpSequenceMatcher;
+import
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpTypeMatcher;
+import
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matcher used by a {@link TransferExpectation} to build a matcher for the
message
+ * payload that accompanies the {@link Transfer}. The matcher generally
adheres to
+ * the standard AMQP message format zero layout.
+ */
+public class TransferMessageMatcher extends TypeSafeMatcher<ByteBuffer> {
+
+ private final TransferExpectation expectation;
+
+ private HeaderMatcher headersMatcher;
+ private DeliveryAnnotationsMatcher deliveryAnnotationsMatcher;
+ private MessageAnnotationsMatcher messageAnnotationsMatcher;
+ private PropertiesMatcher propertiesMatcher;
+ private ApplicationPropertiesMatcher applicationPropertiesMatcher;
+ private List<EncodedAmqpTypeMatcher> bodySectionMatchers = new
ArrayList<>();
+ private FooterMatcher footersMatcher;
+
+ // String buckets for mismatch error descriptions.
+ private String headerMatcherFailureDescription;
+ private String deliveryAnnotationsMatcherFailureDescription;
+ private String messageAnnotationsMatcherFailureDescription;
+ private String propertiesMatcherFailureDescription;
+ private String applicationPropertiesMatcherFailureDescription;
+ private String msgContentMatcherFailureDescription;
+ private String footerMatcherFailureDescription;
+
+ public TransferMessageMatcher(TransferExpectation expectation) {
+ this.expectation = expectation;
+ }
+
+ public TransferExpectation also() {
+ return expectation;
+ }
+
+ public TransferExpectation and() {
+ return expectation;
+ }
+
+ @Override
+ protected boolean matchesSafely(ByteBuffer receivedBinary) {
+ final ByteBuffer receivedSlice =
receivedBinary.slice().asReadOnlyBuffer();
+
+ int bytesConsumed = 0;
+
+ // MessageHeader Section
+ if (headersMatcher != null) {
+ try {
+ bytesConsumed +=
headersMatcher.getInnerMatcher().verify(receivedSlice.slice());
+ receivedSlice.position(bytesConsumed);
+ } catch (Throwable t) {
+ headerMatcherFailureDescription = "\nActual encoded form of
remaining bytes passed to MessageHeaderMatcher: " + receivedSlice;
+ headerMatcherFailureDescription += "\nMessageHeaderMatcher
generated throwable: " + t;
+
+ return false;
+ }
+ }
+
+ // DeliveryAnnotations Section
+ if (deliveryAnnotationsMatcher != null) {
+ try {
+ bytesConsumed +=
deliveryAnnotationsMatcher.getInnerMatcher().verify(receivedSlice.slice());
+ receivedSlice.position(bytesConsumed);
+ } catch (Throwable t) {
+ deliveryAnnotationsMatcherFailureDescription = "\nActual
encoded form of remaining bytes passed " +
+ "to
DeliveryAnnotationsMatcher: " + receivedSlice;
+ deliveryAnnotationsMatcherFailureDescription +=
"\nDeliveryAnnotationsMatcher generated throwable: " + t;
+
+ return false;
+ }
+ }
+
+ // MessageAnnotations Section
+ if (messageAnnotationsMatcher != null) {
+ try {
+ bytesConsumed +=
messageAnnotationsMatcher.getInnerMatcher().verify(receivedSlice.slice());
+ receivedSlice.position(bytesConsumed);
+ } catch (Throwable t) {
+ messageAnnotationsMatcherFailureDescription = "\nActual
encoded form of remaining bytes passed to " +
+
"MessageAnnotationsMatcher: " + receivedSlice;
+ messageAnnotationsMatcherFailureDescription +=
"\nMessageAnnotationsMatcher generated throwable: " + t;
+
+ return false;
+ }
+ }
+
+ // Properties Section
+ if (propertiesMatcher != null) {
+ try {
+ bytesConsumed +=
propertiesMatcher.getInnerMatcher().verify(receivedSlice.slice());
+ receivedSlice.position(bytesConsumed);
+ } catch (Throwable t) {
+ propertiesMatcherFailureDescription = "\nActual encoded form
of remaining bytes passed to " +
+ "PropertiesMatcher: " +
receivedSlice;
+ propertiesMatcherFailureDescription += "\nPropertiesMatcher
generated throwable: " + t;
+
+ return false;
+ }
+ }
+
+ // Application Properties Section
+ if (applicationPropertiesMatcher != null) {
+ try {
+ bytesConsumed +=
applicationPropertiesMatcher.getInnerMatcher().verify(receivedSlice.slice());
+ receivedSlice.position(bytesConsumed);
+ } catch (Throwable t) {
+ applicationPropertiesMatcherFailureDescription = "\nActual
encoded form of remaining bytes passed to " +
+
"ApplicationPropertiesMatcher: " + receivedSlice;
+ applicationPropertiesMatcherFailureDescription +=
"\nApplicationPropertiesMatcher generated throwable: " + t;
+
+ return false;
+ }
+ }
+
+ // Message Content Body Section, already a Matcher<Binary>
+ if (!bodySectionMatchers.isEmpty()) {
+ final ByteBuffer slicedMsgContext = receivedSlice.slice();
+
+ for (Matcher<ByteBuffer> msgContentMatcher : bodySectionMatchers) {
+ final int originalReadableBytes = slicedMsgContext.remaining();
+ final boolean contentMatches =
msgContentMatcher.matches(slicedMsgContext);
+ if (!contentMatches) {
+ Description desc = new StringDescription();
+ msgContentMatcher.describeTo(desc);
+ msgContentMatcher.describeMismatch(receivedSlice, desc);
+
+ msgContentMatcherFailureDescription =
"\nMessageContentMatcher mismatch Description:";
+ msgContentMatcherFailureDescription += desc.toString();
+
+ return false;
+ }
+
+ bytesConsumed += originalReadableBytes -
slicedMsgContext.remaining();
+ receivedSlice.position(bytesConsumed);
+ }
+ }
+
+ // Footers Section
+ if (footersMatcher != null) {
+ try {
+ bytesConsumed +=
footersMatcher.getInnerMatcher().verify(receivedSlice.slice());
+ } catch (Throwable t) {
+ footerMatcherFailureDescription = "\nActual encoded form of
remaining bytes passed to " +
+ "FooterMatcher: " +
receivedSlice;
+ footerMatcherFailureDescription += "\nFooterMatcher generated
throwable: " + t;
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public TransferMessageMatcher withMessageFormat(int format) {
+ this.expectation.withMessageFormat(format);
+ return this;
+ }
+
+ public HeaderMatcher withHeader() {
+ if (headersMatcher == null) {
+ headersMatcher = new HeaderMatcher(this);
+ }
+
+ if (deliveryAnnotationsMatcher != null || messageAnnotationsMatcher !=
null ||
+ propertiesMatcher != null || applicationPropertiesMatcher != null
||
+ !bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ } else {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+ }
+
+ return headersMatcher;
+ }
+
+ public DeliveryAnnotationsMatcher withDeliveryAnnotations() {
+ if (deliveryAnnotationsMatcher == null) {
+ deliveryAnnotationsMatcher = new DeliveryAnnotationsMatcher(this);
+ }
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (messageAnnotationsMatcher != null || propertiesMatcher != null ||
applicationPropertiesMatcher != null ||
+ !bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ } else {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+ }
+
+ return deliveryAnnotationsMatcher;
+ }
+
+ public MessageAnnotationsMatcher withMessageAnnotations() {
+ if (messageAnnotationsMatcher == null) {
+ messageAnnotationsMatcher = new MessageAnnotationsMatcher(this);
+ }
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (deliveryAnnotationsMatcher != null) {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (propertiesMatcher != null || applicationPropertiesMatcher != null
||
+ !bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ } else {
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+ }
+
+ return messageAnnotationsMatcher;
+ }
+
+ public PropertiesMatcher withProperties() {
+ if (propertiesMatcher == null) {
+ propertiesMatcher = new PropertiesMatcher(this);
+ }
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (deliveryAnnotationsMatcher != null) {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (messageAnnotationsMatcher != null) {
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (applicationPropertiesMatcher != null ||
!bodySectionMatchers.isEmpty() || footersMatcher != null) {
+ propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ } else {
+ propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+ }
+
+ return propertiesMatcher;
+ }
+
+ public ApplicationPropertiesMatcher withApplicationProperties() {
+ if (applicationPropertiesMatcher == null) {
+ applicationPropertiesMatcher = new
ApplicationPropertiesMatcher(this);
+ }
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (deliveryAnnotationsMatcher != null) {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (messageAnnotationsMatcher != null) {
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (propertiesMatcher != null) {
+ propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (!bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ } else {
+
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+ }
+
+ return applicationPropertiesMatcher;
+ }
+
+ public TransferMessageMatcher withSequence(List<?> sequence) {
+ final EncodedAmqpSequenceMatcher matcher = new
EncodedAmqpSequenceMatcher(sequence, footersMatcher != null);
+
+ if (!bodySectionMatchers.isEmpty()) {
+ bodySectionMatchers.get(bodySectionMatchers.size() -
1).setAllowTrailingBytes(true);
+ }
+
+ bodySectionMatchers.add(matcher);
+
+ return this;
+ }
+
+ public TransferMessageMatcher withData(byte[] payload) {
+ final EncodedDataMatcher matcher = new EncodedDataMatcher(payload,
footersMatcher != null);
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (deliveryAnnotationsMatcher != null) {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (messageAnnotationsMatcher != null) {
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (propertiesMatcher != null) {
+ propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (applicationPropertiesMatcher != null) {
+
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (!bodySectionMatchers.isEmpty()) {
+ bodySectionMatchers.get(bodySectionMatchers.size() -
1).setAllowTrailingBytes(true);
+ }
+
+ bodySectionMatchers.add(matcher);
+
+ return this;
+ }
+
+ public TransferMessageMatcher withValue(Object value) {
+ final EncodedAmqpValueMatcher matcher = new
EncodedAmqpValueMatcher(value, footersMatcher != null);
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (deliveryAnnotationsMatcher != null) {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (messageAnnotationsMatcher != null) {
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (propertiesMatcher != null) {
+ propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (applicationPropertiesMatcher != null) {
+
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (!bodySectionMatchers.isEmpty()) {
+ bodySectionMatchers.get(bodySectionMatchers.size() -
1).setAllowTrailingBytes(true);
+ }
+
+ bodySectionMatchers.add(matcher);
+
+ return this;
+ }
+
+ public FooterMatcher withFooters() {
+ if (footersMatcher == null) {
+ footersMatcher = new FooterMatcher(this);
+ }
+
+ if (headersMatcher != null) {
+ headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (deliveryAnnotationsMatcher != null) {
+
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (messageAnnotationsMatcher != null) {
+
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (propertiesMatcher != null) {
+ propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+ if (applicationPropertiesMatcher != null) {
+
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+ }
+
+ if (!bodySectionMatchers.isEmpty()) {
+ bodySectionMatchers.get(bodySectionMatchers.size() -
1).setAllowTrailingBytes(true);
+ }
+
+ return footersMatcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Binary encoding of a Transfer frames
payload, containing an AMQP message");
+ }
+
+ @Override
+ protected void describeMismatchSafely(ByteBuffer item, Description
mismatchDescription) {
+ mismatchDescription.appendText("\nActual encoded form of the full
Transfer frame payload: ").appendValue(item);
+
+ // MessageHeaders Section
+ if (headerMatcherFailureDescription != null) {
+ mismatchDescription.appendText("\nMessageHeadersMatcherFailed!");
+ mismatchDescription.appendText(headerMatcherFailureDescription);
+ return;
+ }
+
+ // MessageHeaders Section
+ if (deliveryAnnotationsMatcherFailureDescription != null) {
+
mismatchDescription.appendText("\nDeliveryAnnotationsMatcherFailed!");
+
mismatchDescription.appendText(deliveryAnnotationsMatcherFailureDescription);
+ return;
+ }
+
+ // MessageAnnotations Section
+ if (messageAnnotationsMatcherFailureDescription != null) {
+
mismatchDescription.appendText("\nMessageAnnotationsMatcherFailed!");
+
mismatchDescription.appendText(messageAnnotationsMatcherFailureDescription);
+ return;
+ }
+
+ // Properties Section
+ if (propertiesMatcherFailureDescription != null) {
+ mismatchDescription.appendText("\nPropertiesMatcherFailed!");
+
mismatchDescription.appendText(propertiesMatcherFailureDescription);
+ return;
+ }
+
+ // Application Properties Section
+ if (applicationPropertiesMatcherFailureDescription != null) {
+
mismatchDescription.appendText("\nApplicationPropertiesMatcherFailed!");
+
mismatchDescription.appendText(applicationPropertiesMatcherFailureDescription);
+ return;
+ }
+
+ // Message Content Body Section
+ if (msgContentMatcherFailureDescription != null) {
+ mismatchDescription.appendText("\nContentMatcherFailed!");
+
mismatchDescription.appendText(msgContentMatcherFailureDescription);
+ return;
+ }
+
+ // Footer Section
+ if (footerMatcherFailureDescription != null) {
+ mismatchDescription.appendText("\nContentMatcherFailed!");
+ mismatchDescription.appendText(footerMatcherFailureDescription);
+ }
+ }
+
+ public static final class HeaderMatcher {
+
+ private final
org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher matcher =
+ new
org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher(false);
+
+ private final TransferMessageMatcher transferMatcher;
+
+ public HeaderMatcher(TransferMessageMatcher transferMatcher) {
+ this.transferMatcher = transferMatcher;
+ }
+
+ public TransferMessageMatcher also() {
+ return transferMatcher;
+ }
+
+ public TransferMessageMatcher and() {
+ return transferMatcher;
+ }
+
+ public HeaderMatcher withDurability(boolean durable) {
+ matcher.withDurable(equalTo(durable));
+ return this;
+ }
+
+ public HeaderMatcher withDurability(Boolean durable) {
+ matcher.withDurable(equalTo(durable));
+ return this;
+ }
+
+ public HeaderMatcher withPriority(byte priority) {
+ matcher.withPriority(equalTo(UnsignedByte.valueOf(priority)));
+ return this;
+ }
+
+ public HeaderMatcher withPriority(UnsignedByte priority) {
+ matcher.withPriority(equalTo(priority));
+ return this;
+ }
+
+ public HeaderMatcher withTimeToLive(int timeToLive) {
+ matcher.withTtl(equalTo(UnsignedInteger.valueOf(timeToLive)));
+ return this;
+ }
+
+ public HeaderMatcher withTimeToLive(long timeToLive) {
+ matcher.withTtl(equalTo(UnsignedInteger.valueOf(timeToLive)));
+ return this;
+ }
+
+ public HeaderMatcher withTimeToLive(UnsignedInteger timeToLive) {
+ matcher.withTtl(equalTo(timeToLive));
+ return this;
+ }
+
+ public HeaderMatcher withFirstAcquirer(boolean durable) {
+ matcher.withFirstAcquirer(equalTo(durable));
+ return this;
+ }
+
+ public HeaderMatcher withFirstAcquirer(Boolean durable) {
+ matcher.withFirstAcquirer(equalTo(durable));
+ return this;
+ }
+
+ public HeaderMatcher withDeliveryCount(int deliveryCount) {
+
matcher.withDeliveryCount(equalTo(UnsignedInteger.valueOf(deliveryCount)));
+ return this;
+ }
+
+ public HeaderMatcher withDeliveryCount(long deliveryCount) {
+
matcher.withDeliveryCount(equalTo(UnsignedInteger.valueOf(deliveryCount)));
+ return this;
+ }
+
+ public HeaderMatcher withDeliveryCount(UnsignedInteger deliveryCount) {
+ matcher.withDeliveryCount(equalTo(deliveryCount));
+ return this;
+ }
+
+ org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher
getInnerMatcher() {
+ return matcher;
+ }
+ }
+
+ public static final class DeliveryAnnotationsMatcher {
+
+ private final
org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher
matcher =
+ new
org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher(false);
+
+ private final TransferMessageMatcher transferMatcher;
+
+ public DeliveryAnnotationsMatcher(TransferMessageMatcher
transferMatcher) {
+ this.transferMatcher = transferMatcher;
+ }
+
+ public TransferMessageMatcher also() {
+ return transferMatcher;
+ }
+
+ public TransferMessageMatcher and() {
+ return transferMatcher;
+ }
+
+ public DeliveryAnnotationsMatcher withAnnotation(String key, Object
value) {
+ matcher.withEntry(Symbol.valueOf(key), value);
+ return this;
+ }
+
+ public DeliveryAnnotationsMatcher withAnnotation(Symbol key, Object
value) {
+ matcher.withEntry(key, value);
+ return this;
+ }
+
+
org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher
getInnerMatcher() {
+ return matcher;
+ }
+ }
+
+ public static final class MessageAnnotationsMatcher {
+
+ private final
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher
matcher =
+ new
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher(false);
+
+ private final TransferMessageMatcher transferMatcher;
+
+ public MessageAnnotationsMatcher(TransferMessageMatcher
transferMatcher) {
+ this.transferMatcher = transferMatcher;
+ }
+
+ public TransferMessageMatcher also() {
+ return transferMatcher;
+ }
+
+ public TransferMessageMatcher and() {
+ return transferMatcher;
+ }
+
+ public MessageAnnotationsMatcher withAnnotation(String key, Object
value) {
+ matcher.withEntry(Symbol.valueOf(key), value);
+ return this;
+ }
+
+ public MessageAnnotationsMatcher withAnnotation(Symbol key, Object
value) {
+ matcher.withEntry(key, value);
+ return this;
+ }
+
+
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher
getInnerMatcher() {
+ return matcher;
+ }
+ }
+
+ public static final class PropertiesMatcher {
+
+ private final
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher
matcher =
+ new
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher(false);
+
+ private final TransferMessageMatcher transferMatcher;
+
+ public PropertiesMatcher(TransferMessageMatcher transferMatcher) {
+ this.transferMatcher = transferMatcher;
+ }
+
+ public TransferMessageMatcher also() {
+ return transferMatcher;
+ }
+
+ public TransferMessageMatcher and() {
+ return transferMatcher;
+ }
+
+ public PropertiesMatcher withMessageId(Object messageId) {
+ matcher.withMessageId(messageId);
+ return this;
+ }
+
+ public PropertiesMatcher withUserId(byte[] userId) {
+ matcher.withUserId(userId);
+ return this;
+ }
+
+ public PropertiesMatcher withUserId(Binary userId) {
+ matcher.withUserId(userId);
+ return this;
+ }
+
+ public PropertiesMatcher withTo(String to) {
+ matcher.withTo(to);
+ return this;
+ }
+
+ public PropertiesMatcher withSubject(String subject) {
+ matcher.withSubject(subject);
+ return this;
+ }
+
+ public PropertiesMatcher withReplyTo(String replyTo) {
+ matcher.withReplyTo(replyTo);
+ return this;
+ }
+
+ public PropertiesMatcher withCorrelationId(Object correlationId) {
+ matcher.withCorrelationId(correlationId);
+ return this;
+ }
+
+ public PropertiesMatcher withContentType(String contentType) {
+ matcher.withContentType(contentType);
+ return this;
+ }
+
+ public PropertiesMatcher withContentType(Symbol contentType) {
+ matcher.withContentType(contentType);
+ return this;
+ }
+
+ public PropertiesMatcher withContentEncoding(String contentEncoding) {
+ matcher.withContentEncoding(contentEncoding);
+ return this;
+ }
+
+ public PropertiesMatcher withContentEncoding(Symbol contentEncoding) {
+ matcher.withContentEncoding(contentEncoding);
+ return this;
+ }
+
+ public PropertiesMatcher withAbsoluteExpiryTime(int
absoluteExpiryTime) {
+ matcher.withAbsoluteExpiryTime(absoluteExpiryTime);
+ return this;
+ }
+
+ public PropertiesMatcher withAbsoluteExpiryTime(long
absoluteExpiryTime) {
+ matcher.withAbsoluteExpiryTime(absoluteExpiryTime);
+ return this;
+ }
+
+ public PropertiesMatcher withAbsoluteExpiryTime(Long
absoluteExpiryTime) {
+ matcher.withAbsoluteExpiryTime(absoluteExpiryTime);
+ return this;
+ }
+
+ public PropertiesMatcher withCreationTime(int creationTime) {
+ matcher.withCreationTime(creationTime);
+ return this;
+ }
+
+ public PropertiesMatcher withCreationTime(long creationTime) {
+ matcher.withCreationTime(creationTime);
+ return this;
+ }
+
+ public PropertiesMatcher withCreationTime(Long creationTime) {
+ matcher.withCreationTime(creationTime);
+ return this;
+ }
+
+ public PropertiesMatcher withGroupId(String groupId) {
+ matcher.withGroupId(groupId);
+ return this;
+ }
+
+ public PropertiesMatcher withGroupSequence(int groupSequence) {
+ matcher.withGroupSequence(groupSequence);
+ return this;
+ }
+
+ public PropertiesMatcher withGroupSequence(long groupSequence) {
+ matcher.withGroupSequence(groupSequence);
+ return this;
+ }
+
+ public PropertiesMatcher withGroupSequence(Long groupSequence) {
+ matcher.withGroupSequence(groupSequence);
+ return this;
+ }
+
+ public PropertiesMatcher withReplyToGroupId(String replyToGroupId) {
+ matcher.withReplyToGroupId(replyToGroupId);
+ return this;
+ }
+
+
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher
getInnerMatcher() {
+ return matcher;
+ }
+ }
+
+ public static final class ApplicationPropertiesMatcher {
+
+ private final
org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher
matcher =
+ new
org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher(false);
+
+ private final TransferMessageMatcher transferMatcher;
+
+ public ApplicationPropertiesMatcher(TransferMessageMatcher
transferMatcher) {
+ this.transferMatcher = transferMatcher;
+ }
+
+ public TransferMessageMatcher also() {
+ return transferMatcher;
+ }
+
+ public TransferMessageMatcher and() {
+ return transferMatcher;
+ }
+
+ public ApplicationPropertiesMatcher withProperty(String key, Object
value) {
+ matcher.withEntry(key, value);
+ return this;
+ }
+
+
org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher
getInnerMatcher() {
+ return matcher;
+ }
+ }
+
+ public static final class FooterMatcher {
+
+ private final
org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher matcher =
+ new
org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher(false);
+
+ private final TransferMessageMatcher transferMatcher;
+
+ public FooterMatcher(TransferMessageMatcher transferMatcher) {
+ this.transferMatcher = transferMatcher;
+ }
+
+ public TransferMessageMatcher also() {
+ return transferMatcher;
+ }
+
+ public TransferMessageMatcher and() {
+ return transferMatcher;
+ }
+
+ public FooterMatcher withFooter(String key, Object value) {
+ matcher.withEntry(Symbol.valueOf(key), value);
+ return this;
+ }
+
+ public FooterMatcher withFooter(Symbol key, Object value) {
+ matcher.withEntry(key, value);
+ return this;
+ }
+
+ org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher
getInnerMatcher() {
+ return matcher;
+ }
+ }
+}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
index 95dcd64d..9e78b181 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
@@ -173,7 +173,7 @@ public class TransferPayloadCompositeMatcher extends
TypeSafeMatcher<ByteBuffer>
}
}
- // MessageAnnotations Section
+ // Footers Section
if (footersMatcher != null) {
try {
bytesConsumed += footersMatcher.verify(receivedSlice.slice());
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
index 2c6539a3..0eaf4de0 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
@@ -40,7 +40,7 @@ public abstract class EncodedAmqpTypeMatcher extends
TypeSafeMatcher<ByteBuffer>
private final Symbol descriptorSymbol;
private final UnsignedLong descriptorCode;
private final Object expectedValue;
- private boolean permitTrailingBytes;
+ private boolean allowTrailingBytes;
private DescribedType decodedDescribedType;
private boolean unexpectedTrailingBytes;
@@ -48,11 +48,19 @@ public abstract class EncodedAmqpTypeMatcher extends
TypeSafeMatcher<ByteBuffer>
this(symbol, code, expectedValue, false);
}
- public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object
expectedValue, boolean permitTrailingBytes) {
+ public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object
expectedValue, boolean allowTrailingBytes) {
this.descriptorSymbol = symbol;
this.descriptorCode = code;
this.expectedValue = expectedValue;
- this.permitTrailingBytes = permitTrailingBytes;
+ this.allowTrailingBytes = allowTrailingBytes;
+ }
+
+ public void setAllowTrailingBytes(boolean allowTrailingBytes) {
+ this.allowTrailingBytes = allowTrailingBytes;
+ }
+
+ public boolean isAllowTrailngBytes() {
+ return allowTrailingBytes;
}
protected Object getExpectedValue() {
@@ -110,7 +118,7 @@ public abstract class EncodedAmqpTypeMatcher extends
TypeSafeMatcher<ByteBuffer>
}
}
- if (decoded < length && !permitTrailingBytes) {
+ if (decoded < length && !allowTrailingBytes) {
unexpectedTrailingBytes = true;
return false;
}
diff --git
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
index e5720836..5982968b 100644
---
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
+++
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
@@ -580,4 +580,370 @@ class SenderHandlingTest extends TestPeerTestsBase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testTransferInjectAndExpectAPIs() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage()
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withApplicationProperties().withProperty("ap", "pa").and()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withData(new byte[] {0, 1, 2})
+ .withHeader().withDurability(true).and()
+ .withFooters().withFooter("footer", "value");
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withFooter().withFooter("footer",
"value").also()
+ .withBody().withData(new byte[] {0, 1,
2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.expectDetach().withHandle(42);
+ client.expectEnd();
+
+ client.remoteDetach().now();
+ client.remoteEnd().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testTransferInjectAndExpectAPIsFailOnNoMatchInHeader() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage()
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withApplicationProperties().withProperty("ap", "pa").and()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withData(new byte[] {0, 1, 2})
+ .withHeader().withDurability(true).and()
+ .withFooters().withFooter("footer", "value");
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withHeader().withDurability(false).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withFooter().withFooter("footer",
"value").also()
+ .withBody().withData(new byte[] {0, 1,
2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ assertThrows(AssertionError.class, () ->
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void
testTransferInjectAndExpectAPIsFailOnNoMatchDeliveryAnnotations() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage()
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withApplicationProperties().withProperty("ap", "pa").and()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withData(new byte[] {0, 1, 2})
+ .withHeader().withDurability(true).and()
+ .withFooters().withFooter("footer", "value");
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "1").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withFooter().withFooter("footer",
"value").also()
+ .withBody().withData(new byte[] {0, 1,
2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ assertThrows(AssertionError.class, () ->
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void
testTransferInjectAndExpectAPIsFailOnNoMatchMessageAnnotations() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage()
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withApplicationProperties().withProperty("ap", "pa").and()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withData(new byte[] {0, 1, 2})
+ .withHeader().withDurability(true).and()
+ .withFooters().withFooter("footer", "value");
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "1").also()
+ .withFooter().withFooter("footer",
"value").also()
+ .withBody().withData(new byte[] {0, 1,
2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ assertThrows(AssertionError.class, () ->
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testTransferInjectAndExpectAPIsFailOnNoMatchFooter() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage()
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withApplicationProperties().withProperty("ap", "pa").and()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withData(new byte[] {0, 1, 2})
+ .withHeader().withDurability(true).and()
+ .withFooters().withFooter("footer", "value");
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withFooter().withFooter("footer",
"1").also()
+ .withBody().withData(new byte[] {0, 1,
2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ assertThrows(AssertionError.class, () ->
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testTransferInjectAndExpectAPIsFailOnMissingSection() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage()
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withApplicationProperties().withProperty("ap", "pa").and()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withData(new byte[] {0, 1, 2})
+ .withHeader().withDurability(true).and()
+ .withFooters().withFooter("footer", "value");
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+ .withBody().withData(new byte[] {0, 1,
2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ assertThrows(AssertionError.class, () ->
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testTransferInjectAndExpectAPIsMapTypePresenceOnly() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond().withHandle(42);
+ peer.remoteFlow().withLinkCredit(1).queue();
+ // Script a full message using the inject API
+ peer.expectTransfer().withMessage().withMessageFormat(1)
+
.withProperties().withCorrelationId("test").and()
+
.withDeliveryAnnotations().also()
+
.withApplicationProperties().and()
+
.withMessageAnnotations().also()
+ .withData(new byte[] {0, 1, 2})
+
.withHeader().withDurability(true).and()
+ .withFooters();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().withHandle(42);
+ client.expectFlow().withLinkCredit(1).withHandle(42);
+ client.remoteTransfer().withMessage().withMessageFormat(1)
+
.withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("ap", "pa").also()
+
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+
.withProperties().withCorrelationId("test").also()
+
.withMessageAnnotations().withAnnotation("ma", "am").also()
+
.withFooter().withFooter("footer", "1").also()
+ .withBody().withData(new
byte[] {0, 1, 2}).also()
+ .queue();
+
+ // Now start and then await the remote grant of credit and out
send of a transfer
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().withHandle(2).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]