This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new d81f862 PROTON-2383 Enhance the test driver link state tracking d81f862 is described below commit d81f862b85fa36ad11b66a5e42368e8feaf1ceb3 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Thu May 20 14:54:38 2021 -0400 PROTON-2383 Enhance the test driver link state tracking Improve upon the link state tracking in the test driver such that only a single link tracker instance exists for any link between the remote peer and the local end in the test driver. Wires up more auto response fill ins and checks more state as expectations and responses are executed to better relay errors into the running test. --- .../qpid/protonj2/test/driver/LinkTracker.java | 95 ++++++-- .../qpid/protonj2/test/driver/ReceiverTracker.java | 16 +- .../qpid/protonj2/test/driver/ScriptWriter.java | 31 +-- .../qpid/protonj2/test/driver/SenderTracker.java | 19 +- .../qpid/protonj2/test/driver/SessionTracker.java | 255 ++++++++++++++++----- .../actions/DetachLastCoordinatorInjectAction.java | 6 + .../test/driver/actions/TransferInjectAction.java | 4 +- .../test/driver/codec/transport/Attach.java | 8 + .../driver/expectations/DetachExpectation.java | 7 +- .../test/driver/expectations/FlowExpectation.java | 2 +- ...HandlingTest.java => ReceiverHandlingTest.java} | 141 +++++++----- ...onHandlingTest.java => SenderHandlingTest.java} | 141 +++++++----- .../protonj2/test/driver/SessionHandlingTest.java | 2 +- 13 files changed, 521 insertions(+), 206 deletions(-) diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/LinkTracker.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/LinkTracker.java index cafb9ca..617e8ae 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/LinkTracker.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/LinkTracker.java @@ -21,6 +21,7 @@ import org.apache.qpid.protonj2.test.driver.codec.messaging.Target; import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger; import org.apache.qpid.protonj2.test.driver.codec.transactions.Coordinator; import org.apache.qpid.protonj2.test.driver.codec.transport.Attach; +import org.apache.qpid.protonj2.test.driver.codec.transport.Detach; import org.apache.qpid.protonj2.test.driver.codec.transport.Flow; import org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode; import org.apache.qpid.protonj2.test.driver.codec.transport.Role; @@ -35,11 +36,15 @@ import io.netty.buffer.ByteBuf; public abstract class LinkTracker { private final SessionTracker session; - private final Attach attach; - public LinkTracker(SessionTracker session, Attach attach) { + private Attach remoteAttach; + private Detach remoteDetach; + + private Attach localAttach; + private Detach localDetach; + + public LinkTracker(SessionTracker session) { this.session = session; - this.attach = attach; } public SessionTracker getSession() { @@ -47,7 +52,11 @@ public abstract class LinkTracker { } public String getName() { - return attach.getName(); + if (remoteAttach != null) { + return remoteAttach.getName(); + } else { + return localAttach.getName(); + } } public Role getRole() { @@ -55,39 +64,97 @@ public abstract class LinkTracker { } public SenderSettleMode getSenderSettleMode() { - return attach.getSenderSettleMode() != null ? SenderSettleMode.valueOf(attach.getSenderSettleMode()) : SenderSettleMode.MIXED; + return localAttach.getSenderSettleMode() != null ? SenderSettleMode.valueOf(localAttach.getSenderSettleMode()) : SenderSettleMode.MIXED; } public ReceiverSettleMode getReceiverSettleMode() { - return attach.getReceiverSettleMode() != null ? ReceiverSettleMode.valueOf(attach.getReceiverSettleMode()) : ReceiverSettleMode.FIRST; + return localAttach.getReceiverSettleMode() != null ? ReceiverSettleMode.valueOf(localAttach.getReceiverSettleMode()) : ReceiverSettleMode.FIRST; + } + + public SenderSettleMode getRemoteSenderSettleMode() { + return remoteAttach.getSenderSettleMode() != null ? SenderSettleMode.valueOf(remoteAttach.getSenderSettleMode()) : SenderSettleMode.MIXED; + } + + public ReceiverSettleMode getRemoteReceiverSettleMode() { + return remoteAttach.getReceiverSettleMode() != null ? ReceiverSettleMode.valueOf(remoteAttach.getReceiverSettleMode()) : ReceiverSettleMode.FIRST; } public UnsignedInteger getHandle() { - return attach.getHandle(); + return localAttach.getHandle(); + } + + public UnsignedInteger getRemoteHandle() { + return remoteAttach.getHandle(); } public Source getSource() { - return attach.getSource(); + return localAttach.getSource(); } public Target getTarget() { - return attach.getTarget() instanceof Target ? (Target) attach.getTarget() : null; + return localAttach.getTarget() instanceof Target ? (Target) localAttach.getTarget() : null; } public Coordinator getCoordinator() { - return attach.getTarget() instanceof Coordinator ? (Coordinator) attach.getTarget() : null; + return localAttach.getTarget() instanceof Coordinator ? (Coordinator) localAttach.getTarget() : null; + } + + public Source getRemoteSource() { + return remoteAttach.getSource(); + } + + public Target getRemoteTarget() { + return remoteAttach.getTarget() instanceof Target ? (Target) remoteAttach.getTarget() : null; + } + + public Coordinator getRemoteCoordinator() { + return remoteAttach.getTarget() instanceof Coordinator ? (Coordinator) remoteAttach.getTarget() : null; + } + + public boolean isRemotelyAttached() { + return remoteAttach != null; } - public boolean isSender() { - return Role.RECEIVER.getValue() == attach.getRole(); + public boolean isRemotelyDetached() { + return remoteDetach != null; } - public boolean isReceiver() { - return Role.SENDER.getValue() == attach.getRole(); + public boolean isLocallyAttached() { + return localAttach != null; + } + + public boolean isLocallyDetached() { + return localDetach != null; + } + + public LinkTracker handleLocalAttach(Attach localAttach) { + this.localAttach = localAttach; + + return this; + } + + public LinkTracker handleLocalDetach(Detach localDetach) { + this.localDetach = localDetach; + + return this; + } + + public void handlerRemoteAttach(Attach remoteAttach) { + this.remoteAttach = remoteAttach; + } + + public LinkTracker handleRemoteDetach(Detach remoteDetach) { + this.remoteDetach = remoteDetach; + + return this; } protected abstract void handleTransfer(Transfer transfer, ByteBuf payload); protected abstract void handleFlow(Flow flow); + public abstract boolean isSender(); + + public abstract boolean isReceiver(); + } \ No newline at end of file diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ReceiverTracker.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ReceiverTracker.java index 6b632c8..94211a4 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ReceiverTracker.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ReceiverTracker.java @@ -16,7 +16,6 @@ */ package org.apache.qpid.protonj2.test.driver; -import org.apache.qpid.protonj2.test.driver.codec.transport.Attach; import org.apache.qpid.protonj2.test.driver.codec.transport.Flow; import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer; @@ -28,9 +27,8 @@ import io.netty.buffer.ByteBuf; */ public class ReceiverTracker extends LinkTracker { - public ReceiverTracker(SessionTracker session, Attach attach) { - super(session, attach); - // TODO Auto-generated constructor stub + public ReceiverTracker(SessionTracker session) { + super(session); } @Override @@ -42,4 +40,14 @@ public class ReceiverTracker extends LinkTracker { protected void handleFlow(Flow flow) { } + + @Override + public boolean isSender() { + return false; + } + + @Override + public boolean isReceiver() { + return true; + } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java index cf8cddd..15e2ab5 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java @@ -547,37 +547,42 @@ public abstract class ScriptWriter { public AttachInjectAction respondToLastAttach() { AttachInjectAction response = new AttachInjectAction(getDriver()); - LinkTracker link = getDriver().sessions().getLastRemotelyOpenedSession().getLastOpenedLink(); + final SessionTracker session = getDriver().sessions().getLastRemotelyOpenedSession(); + final LinkTracker link = session.getLastRemotelyOpenedLink(); + if (link == null) { throw new IllegalStateException("Cannot create response to Attach before one has been received."); } + if (link.isLocallyAttached()) { + throw new IllegalStateException("Cannot create response to Attach since a local Attach was already sent."); + } + // Populate the response using data in the locally opened link, script can override this after return. response.onChannel(link.getSession().getLocalChannel()); - response.withHandle(link.getHandle()); response.withName(link.getName()); response.withRole(link.getRole()); - response.withSndSettleMode(link.getSenderSettleMode()); - response.withRcvSettleMode(link.getReceiverSettleMode()); + response.withSndSettleMode(link.getRemoteSenderSettleMode()); + response.withRcvSettleMode(link.getRemoteReceiverSettleMode()); - if (link.getSource() != null) { - response.withSource(new Source(link.getSource())); - if (Boolean.TRUE.equals(link.getSource().getDynamic())) { + if (link.getRemoteSource() != null) { + response.withSource(new Source(link.getRemoteSource())); + if (Boolean.TRUE.equals(link.getRemoteSource().getDynamic())) { response.withSource().withAddress(UUID.randomUUID().toString()); } } - if (link.getTarget() != null) { - response.withTarget(new Target(link.getTarget())); - if (Boolean.TRUE.equals(link.getTarget().getDynamic())) { + if (link.getRemoteTarget() != null) { + response.withTarget(new Target(link.getRemoteTarget())); + if (Boolean.TRUE.equals(link.getRemoteTarget().getDynamic())) { response.withTarget().withAddress(UUID.randomUUID().toString()); } } - if (link.getCoordinator() != null) { - response.withTarget(new Coordinator(link.getCoordinator())); + if (link.getRemoteCoordinator() != null) { + response.withTarget(new Coordinator(link.getRemoteCoordinator())); } if (response.getPerformative().getInitialDeliveryCount() == null) { - if (link.getRole() == Role.SENDER) { + if (link.isSender()) { response.withInitialDeliveryCount(0); } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SenderTracker.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SenderTracker.java index 1d9c685..3bdacd6 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SenderTracker.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SenderTracker.java @@ -16,7 +16,6 @@ */ package org.apache.qpid.protonj2.test.driver; -import org.apache.qpid.protonj2.test.driver.codec.transport.Attach; import org.apache.qpid.protonj2.test.driver.codec.transport.Flow; import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer; @@ -28,17 +27,27 @@ import io.netty.buffer.ByteBuf; */ public class SenderTracker extends LinkTracker { - public SenderTracker(SessionTracker session, Attach attach) { - super(session, attach); + public SenderTracker(SessionTracker session) { + super(session); } @Override protected void handleTransfer(Transfer transfer, ByteBuf payload) { - throw new AssertionError("Sender links cannot process incoming Transfers"); + // TODO Handle sender scripted transfer by updating local state. } @Override protected void handleFlow(Flow flow) { - // TODO Auto-generated method stub + // TODO Handle flow update to sender credit state + } + + @Override + public boolean isSender() { + return true; + } + + @Override + public boolean isReceiver() { + return false; } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SessionTracker.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SessionTracker.java index c31ae7e..0d0d393 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SessionTracker.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/SessionTracker.java @@ -16,14 +16,12 @@ */ package org.apache.qpid.protonj2.test.driver; -import java.util.ArrayDeque; -import java.util.Deque; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger; import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort; -import org.apache.qpid.protonj2.test.driver.codec.transactions.Coordinator; import org.apache.qpid.protonj2.test.driver.codec.transport.Attach; import org.apache.qpid.protonj2.test.driver.codec.transport.Begin; import org.apache.qpid.protonj2.test.driver.codec.transport.Detach; @@ -40,10 +38,11 @@ import io.netty.buffer.ByteBuf; */ public class SessionTracker { - private final Deque<LinkTracker> remoteSenders = new ArrayDeque<>(); - private final Deque<LinkTracker> remoteReceivers = new ArrayDeque<>(); + private final Map<String, LinkTracker> senderByNameMap = new LinkedHashMap<>(); + private final Map<String, LinkTracker> receiverByNameMap = new LinkedHashMap<>(); - private final Map<UnsignedInteger, LinkTracker> trackerMap = new LinkedHashMap<>(); + private final Map<UnsignedInteger, LinkTracker> localLinks = new LinkedHashMap<>(); + private final Map<UnsignedInteger, LinkTracker> remoteLinks = new LinkedHashMap<>(); private UnsignedShort localChannel; private UnsignedShort remoteChannel; @@ -53,8 +52,6 @@ public class SessionTracker { private Begin localBegin; private End remoteEnd; private End localEnd; - private LinkTracker lastOpenedLink; - private LinkTracker lastOpenedCoordinatorLink; private final AMQPTestDriver driver; @@ -67,21 +64,91 @@ public class SessionTracker { } public LinkTracker getLastOpenedLink() { - return lastOpenedLink; + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + localLinks.forEach((key, value) -> { + linkTracker.set(value); + }); + + return linkTracker.get(); + } + + public LinkTracker getLastRemotelyOpenedLink() { + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + remoteLinks.forEach((key, value) -> { + linkTracker.set(value); + }); + + return linkTracker.get(); } public LinkTracker getLastOpenedCoordinatorLink() { - return lastOpenedCoordinatorLink; + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + localLinks.forEach((key, value) -> { + if (value.getCoordinator() != null) { + linkTracker.set(value); + } + }); + + return linkTracker.get(); + } + + public LinkTracker getLastRemotelyOpenedCoordinatorLink() { + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + remoteLinks.forEach((key, value) -> { + if (value.getRemoteCoordinator() != null) { + linkTracker.set(value); + } + }); + + return linkTracker.get(); } public LinkTracker getLastOpenedRemoteSender() { - return remoteSenders.getLast(); + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + remoteLinks.forEach((key, value) -> { + if (value.isReceiver()) { + linkTracker.set(value); + } + }); + + return linkTracker.get(); } public LinkTracker getLastOpenedRemoteReceiver() { - return remoteReceivers.getLast(); + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + remoteLinks.forEach((key, value) -> { + if (value.isSender()) { + linkTracker.set(value); + } + }); + + return linkTracker.get(); } + public LinkTracker getLastOpenedSender() { + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + localLinks.forEach((key, value) -> { + if (value.isSender()) { + linkTracker.set(value); + } + }); + + return linkTracker.get(); + } + + public LinkTracker getLastOpenedReceiver() { + final AtomicReference<LinkTracker> linkTracker = new AtomicReference<>(); + localLinks.forEach((key, value) -> { + if (value.isReceiver()) { + linkTracker.set(value); + } + }); + + return linkTracker.get(); + } + + //----- Session specific access which can provide details for expectations + public End getRemoteEnd() { return remoteEnd; } @@ -90,8 +157,6 @@ public class SessionTracker { return localEnd; } - //----- Session specific access which can provide details for expectations - public Begin getRemoteBegin() { return remoteBegin; } @@ -145,78 +210,119 @@ public class SessionTracker { } public LinkTracker handleRemoteAttach(Attach attach) { - LinkTracker linkTracker = trackerMap.get(attach.getHandle()); - - // We only populate these remote value here, never in the local side processing - // this implies that we need to check if this was remotely initiated and create - // the link tracker if none exists yet - // TODO: These SenderTracker and ReceiverTracker inversions are confusing and probably - // not going to work for future enhancements. - if (attach.getRole().equals(Role.SENDER.getValue())) { - if (linkTracker == null) { - linkTracker = new ReceiverTracker(this, attach); - } - remoteSenders.add(linkTracker); - } else { - if (linkTracker == null) { - linkTracker = new SenderTracker(this, attach); - } - remoteReceivers.add(linkTracker); + LinkTracker linkTracker = remoteLinks.get(attach.getHandle()); + + if (linkTracker != null) { + throw new AssertionError(String.format( + "Received second attach of link handle %s with name %s", attach.getHandle(), attach.getName())); + } + + final UnsignedInteger localHandleMax = localBegin == null ? UnsignedInteger.ZERO : + localBegin.getHandleMax() == null ? UnsignedInteger.MAX_VALUE : localBegin.getHandleMax(); + + if (attach.getHandle().compareTo(localHandleMax) > 0) { + throw new AssertionError("Session Handle Max [" + localHandleMax + "] Exceeded for link Attach: " + attach.getHandle()); } - if (attach.getTarget() instanceof Coordinator) { - lastOpenedCoordinatorLink = linkTracker; - driver.sessions().setLastOpenedCoordinator(lastOpenedCoordinatorLink); + // Check that the remote attach is an original link attach with no corresponding local + // attach having already been done or not as there should only ever be one instance of + // a link tracker for any given link. + linkTracker = findMatchingPendingLinkOpen(attach); + if (linkTracker == null) { + if (attach.getRole().equals(Role.SENDER.getValue())) { + linkTracker = new ReceiverTracker(this); + receiverByNameMap.put(attach.getName(), linkTracker); + } else { + linkTracker = new SenderTracker(this); + senderByNameMap.put(attach.getName(), linkTracker); + } } - lastOpenedLink = linkTracker; - trackerMap.put(attach.getHandle(), linkTracker); + remoteLinks.put(attach.getHandle(), linkTracker); + linkTracker.handlerRemoteAttach(attach); + + if (linkTracker.getRemoteCoordinator() != null) { + getDriver().sessions().setLastOpenedCoordinator(linkTracker); + } return linkTracker; } public LinkTracker handleLocalAttach(Attach attach) { - LinkTracker linkTracker = trackerMap.get(attach.getHandle()); + LinkTracker linkTracker = localLinks.get(attach.getHandle()); // Create a tracker for the local side to use to respond to remote - // performative or to use when invoking local actions. + // performative or to use when invoking local actions but don't validate + // that it was already sent one as a test might be checking remote handling. if (linkTracker == null) { if (attach.getRole().equals(Role.SENDER.getValue())) { - linkTracker = new SenderTracker(this, attach); + linkTracker = senderByNameMap.get(attach.getName()); + if (linkTracker == null) { + linkTracker = new SenderTracker(this); + senderByNameMap.put(attach.getName(), linkTracker); + } } else { - linkTracker = new ReceiverTracker(this, attach); + linkTracker = receiverByNameMap.get(attach.getName()); + if (linkTracker == null) { + linkTracker = new ReceiverTracker(this); + receiverByNameMap.put(attach.getName(), linkTracker); + } } - } - lastOpenedLink = linkTracker; - trackerMap.put(attach.getHandle(), linkTracker); + localLinks.put(attach.getHandle(), linkTracker); + linkTracker.handleLocalAttach(attach); + } return linkTracker; } public LinkTracker handleRemoteDetach(Detach detach) { - LinkTracker tracker = trackerMap.get(detach.getHandle()); + LinkTracker tracker = remoteLinks.get(detach.getHandle()); if (tracker != null) { - remoteSenders.remove(tracker); - remoteReceivers.remove(tracker); + tracker.handleRemoteDetach(detach); + remoteLinks.remove(detach.getHandle()); + + if (tracker.isLocallyDetached()) { + if (tracker.isSender()) { + senderByNameMap.remove(tracker.getName()); + } else { + receiverByNameMap.remove(tracker.getName()); + } + } + } else { + throw new AssertionError(String.format( + "Received Detach for unknown remote link with handle %s", detach.getHandle())); } return tracker; } public LinkTracker handleLocalDetach(Detach detach) { - LinkTracker tracker = trackerMap.get(detach.getHandle()); + LinkTracker tracker = localLinks.get(detach.getHandle()); - // TODO: Cleanup local state when we start tracking both sides. + // Handle the detach and remove if we knew about it, otherwise ignore as + // the test might be checked for handling of unexpected End frames etc. + if (tracker != null) { + tracker.handleLocalDetach(detach); + localLinks.remove(detach.getHandle()); + + if (tracker.isRemotelyDetached()) { + if (tracker.isSender()) { + senderByNameMap.remove(tracker.getName()); + } else { + receiverByNameMap.remove(tracker.getName()); + } + } + } return tracker; } public LinkTracker handleTransfer(Transfer transfer, ByteBuf payload) { - LinkTracker tracker = trackerMap.get(transfer.getHandle()); + LinkTracker tracker = remoteLinks.get(transfer.getHandle()); - if (tracker.getRole() == Role.SENDER) { + if (tracker.isSender()) { throw new AssertionError("Received inbound Transfer addressed to a local Sender link"); } else { tracker.handleTransfer(transfer, payload); @@ -226,6 +332,18 @@ public class SessionTracker { return tracker; } + public void handleLocalTransfer(Transfer transfer, ByteBuf payload) { + LinkTracker tracker = localLinks.get(transfer.getHandle()); + + // Pass along to local sender for processing before sending and ignore if + // we aren't tracking a link or the link is a receiver as the test might + // be checking how the remote handles invalid frames. + if (tracker != null && tracker.isSender()) { + tracker.handleTransfer(transfer, payload); + // TODO - Update session state based on transfer + } + } + public void handleDisposition(Disposition disposition) { // TODO Forward to attached links or issue errors if invalid. } @@ -234,16 +352,17 @@ public class SessionTracker { // TODO Forward to attached links or issue error if invalid. } - public void handleLocalTransfer(Transfer transfer) { - // TODO Forward to attached link or issue error if invalid. - } - public LinkTracker handleFlow(Flow flow) { LinkTracker tracker = null; if (flow.getHandle() != null) { - tracker = trackerMap.get(flow.getHandle()); - tracker.handleFlow(flow); + tracker = remoteLinks.get(flow.getHandle()); + if (tracker != null) { + tracker.handleFlow(flow); + } else { + throw new AssertionError(String.format( + "Received Flow for unknown remote link with handle %s", flow.getHandle())); + } } return tracker; @@ -254,11 +373,33 @@ public class SessionTracker { for (long i = 0; i <= HANDLE_MAX.longValue(); ++i) { final UnsignedInteger handle = UnsignedInteger.valueOf(i); - if (!trackerMap.containsKey(handle)) { + if (!localLinks.containsKey(handle)) { return handle; } } throw new IllegalStateException("no local handle available for allocation"); } + + private LinkTracker findMatchingPendingLinkOpen(Attach remoteAttach) { + for (LinkTracker link : senderByNameMap.values()) { + if (link.getName().equals(remoteAttach.getName()) && + !link.isRemotelyAttached() && + remoteAttach.isReceiver()) { + + return link; + } + } + + for (LinkTracker link : receiverByNameMap.values()) { + if (link.getName().equals(remoteAttach.getName()) && + !link.isRemotelyAttached() && + remoteAttach.isSender()) { + + return link; + } + } + + return null; + } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java index 1a80542..b4b1441 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java @@ -41,7 +41,13 @@ public class DetachLastCoordinatorInjectAction extends DetachInjectAction { throw new AssertionError("Cannot send coordinator dectach as scripted, no active coordinator found."); } + if (!tracker.isLocallyAttached()) { + // TODO: We could attempt to create an attach here and send that first + throw new AssertionError("Cannot send coordinator dectach as scripted, Coordinator link was not locally attached."); + } + onChannel(tracker.getSession().getLocalChannel().intValue()); + getPerformative().setHandle(tracker.getHandle()); } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java index a997ebc..82a44a3 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java @@ -96,7 +96,7 @@ public class TransferInjectAction extends AbstractPerformativeInjectAction<Trans // Auto select last opened receiver on last opened session. Later an option could // be added to allow forcing the handle to be null for testing specification requirements. if (transfer.getHandle() == null) { - transfer.setHandle(driver.sessions().getLastLocallyOpenedSession().getLastOpenedRemoteReceiver().getHandle()); + transfer.setHandle(driver.sessions().getLastLocallyOpenedSession().getLastOpenedSender().getHandle()); } final SessionTracker session = driver.sessions().getSessionFromLocalChannel(UnsignedShort.valueOf(onChannel())); @@ -108,7 +108,7 @@ public class TransferInjectAction extends AbstractPerformativeInjectAction<Trans // next Id from the driver as well as checking for a session and using last // created one if none set. - session.handleLocalTransfer(transfer); + session.handleLocalTransfer(transfer, getPayload()); } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transport/Attach.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transport/Attach.java index 45a18aa..b924f7d 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transport/Attach.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transport/Attach.java @@ -69,6 +69,14 @@ public class Attach extends PerformativeDescribedType { super(Field.values().length, described); } + public boolean isSender() { + return getRole().booleanValue() == false; + } + + public boolean isReceiver() { + return getRole().booleanValue() == true; + } + @Override public Symbol getDescriptor() { return DESCRIPTOR_SYMBOL; diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DetachExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DetachExpectation.java index 00f4c21..3c1cbb1 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DetachExpectation.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DetachExpectation.java @@ -82,6 +82,11 @@ public class DetachExpectation extends AbstractExpectation<Detach> { final LinkTracker link = session.handleRemoteDetach(detach); + if (link == null) { + throw new AssertionError(String.format( + "Received Detach on channel [%d] that has no matching Attached link for that remote handle. ", detach.getHandle())); + } + if (response != null) { // Input was validated now populate response with auto values where not configured // to say otherwise by the test. @@ -90,7 +95,7 @@ public class DetachExpectation extends AbstractExpectation<Detach> { } if (response.getPerformative().getHandle() == null) { - response.withHandle(detach.getHandle()); + response.withHandle(link.getHandle()); } if (response.getPerformative().getClosed() == null) { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/FlowExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/FlowExpectation.java index cfa369c..49dc971 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/FlowExpectation.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/FlowExpectation.java @@ -112,7 +112,7 @@ public class FlowExpectation extends AbstractExpectation<Flow> { } if (response.getPerformative().getHandle() == null && linkTracker != null) { - response.withHandle(linkTracker.getHandle()); //TODO: this is wrong, need a lookup for the local link and then get its remote handle. + response.withHandle(linkTracker.getHandle()); } // TODO: blow up on response if credit not populated? diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java similarity index 67% copy from protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java copy to protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java index caf3cb7..47a3357 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.qpid.protonj2.test.driver; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -29,21 +30,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Tests for the test driver session handling from both client and server perspectives. + * Tests for the test driver remote sender handling from both client and server perspectives. */ @Timeout(20) -class SessionHandlingTest extends TestPeerTestsBase { +class ReceiverHandlingTest extends TestPeerTestsBase { - private static final Logger LOG = LoggerFactory.getLogger(SessionHandlingTest.class); + private static final Logger LOG = LoggerFactory.getLogger(ReceiverHandlingTest.class); @Test - public void testSessionTrackingWithClientOpensSession() throws Exception { + public void testReceiverTrackingWithClientOpensReceiver() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); peer.expectBegin().onChannel(0).respond(); + peer.expectAttach().ofReceiver().withHandle(0).onChannel(0).respond(); peer.expectEnd().onChannel(0).respond(); peer.start(); @@ -53,10 +55,12 @@ class SessionHandlingTest extends TestPeerTestsBase { client.expectAMQPHeader(); client.expectOpen(); client.expectBegin().onChannel(0); + client.expectAttach().ofSender().onChannel(0).withHandle(0); client.expectEnd().onChannel(0); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofReceiver().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); client.close(); @@ -68,14 +72,15 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testSessionBeginResponseUsesScriptedChannel() throws Exception { + public void testAttachResponseUsesScriptedChannel() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond().onChannel(42); - peer.expectEnd().onChannel(0).respond().onChannel(42); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.expectEnd().respond(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -85,11 +90,13 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().withRemoteChannel(0).onChannel(42); - client.expectEnd().onChannel(42); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectEnd(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -98,14 +105,15 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testWaitForCompletionFailsWhenRemoteSendEndOnWrongChannel() throws Exception { + public void testWaitForCompletionFailsWhenRemoteSendDetacgWithWrongHandle() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond().onChannel(42); - peer.expectEnd().onChannel(0).respond().onChannel(43); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond().withHandle(42); + peer.expectDetach().respond().withHandle(43); peer.start(); URI remoteURI = peer.getServerURI(); @@ -115,27 +123,31 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().withRemoteChannel(0).onChannel(42); - client.expectEnd().onChannel(42); + client.expectBegin(); + client.expectAttach().ofSender().withHandle(42); + client.expectDetach().withHandle(42); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); - client.remoteEnd().now(); + client.remoteAttach().ofReceiver().now(); + client.remoteDetach().now(); - assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(5, TimeUnit.SECONDS)); + assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(30, TimeUnit.SECONDS)); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @Test - public void testServerEndResponseFillsChannelsAutomaticallyIfNoneSpecified() throws Exception { + public void testServerDetachResponseFillsHandlesAutomaticallyIfNoneSpecified() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond().onChannel(42); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond().withHandle(42); + peer.expectDetach().respond(); peer.expectEnd().respond(); peer.start(); @@ -146,11 +158,15 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().withRemoteChannel(0).onChannel(42); - client.expectEnd().onChannel(42); + client.expectBegin(); + client.expectAttach().ofSender().withHandle(42); + client.expectDetach().withHandle(42); + client.expectEnd(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofReceiver().now(); + client.remoteDetach().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -159,13 +175,14 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testServerRespondToLastBeginFeature() throws Exception { + public void testServerRespondToLastAttachFeature() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -175,22 +192,27 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); + client.expectBegin(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofReceiver().now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectBegin().withRemoteChannel(0).onChannel(42); + client.expectAttach().ofSender(); // Now we respond to the last begin we saw at the server side. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); peer.expectEnd().respond(); - peer.respondToLastBegin().onChannel(42).now(); + peer.respondToLastAttach().now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectEnd().onChannel(42); + client.expectDetach(); + client.expectEnd(); + client.remoteDetach().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -199,18 +221,20 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testOpenAndCloseMultipleSessionsWithAutoChannelHandlingExpected() throws Exception { + public void testOpenAndCloseMultipleLinksWithAutoChannelHandlingExpected() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond(); - peer.expectBegin().onChannel(1).respond(); - peer.expectBegin().onChannel(2).respond(); - peer.expectEnd().onChannel(2).respond(); - peer.expectEnd().onChannel(1).respond(); - peer.expectEnd().onChannel(0).respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().withHandle(0).respond(); + peer.expectAttach().ofReceiver().withHandle(1).respond(); + peer.expectAttach().ofReceiver().withHandle(2).respond(); + peer.expectDetach().withHandle(2).respond(); + peer.expectDetach().withHandle(1).respond(); + peer.expectDetach().withHandle(0).respond(); + peer.expectEnd().respond(); peer.expectClose().respond(); peer.start(); @@ -220,24 +244,28 @@ class SessionHandlingTest extends TestPeerTestsBase { client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().onChannel(0); - client.expectBegin().onChannel(1); - client.expectBegin().onChannel(2); + client.expectBegin(); + client.expectAttach().ofSender().withHandle(0); + client.expectAttach().ofSender().withHandle(1); + client.expectAttach().ofSender().withHandle(2); client.connect(remoteURI.getHost(), remoteURI.getPort()); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); - client.remoteBegin().now(); - client.remoteBegin().now(); + client.remoteAttach().ofReceiver().now(); + client.remoteAttach().ofReceiver().now(); + client.remoteAttach().ofReceiver().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectEnd().onChannel(2); - client.expectEnd().onChannel(1); - client.expectEnd().onChannel(0); - - client.remoteEnd().onChannel(2).now(); - client.remoteEnd().onChannel(1).now(); - client.remoteEnd().onChannel(0).now(); + client.expectDetach().withHandle(2); + client.expectDetach().withHandle(1); + client.expectDetach().withHandle(0); + client.expectEnd(); + + client.remoteDetach().withHandle(2).now(); + client.remoteDetach().withHandle(1).now(); + client.remoteDetach().withHandle(0).now(); + client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); client.expectClose(); @@ -249,13 +277,14 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testPeerEndsConnectionIfRemoteRespondsWithToHighChannelValue() throws Exception { + public void testPeerEndsConnectionIfRemoteRespondsWithToHighHandleValue() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); - peer.expectOpen().withChannelMax(0).respond(); - peer.expectBegin(); + peer.expectOpen().respond(); + peer.expectBegin().withHandleMax(0).respond(); + peer.expectAttach().ofReceiver(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -265,17 +294,19 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); + client.expectBegin(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); - client.remoteOpen().withChannelMax(0).now(); - client.remoteBegin().now(); + client.remoteOpen().now(); + client.remoteBegin().withHandleMax(0).now(); + client.remoteAttach().ofReceiver().now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectBegin(); + client.expectAttach().ofSender(); - // Now we respond to the last begin we saw at the server side. + // Now we respond to the last attach we saw at the server side. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.respondToLastBegin().onChannel(42).now(); + peer.respondToLastAttach().withHandle(42).now(); assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(5, TimeUnit.SECONDS)); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -283,13 +314,14 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testPeerEnforcesChannelMaxOfZeroOnPipelinedOpenBegin() throws Exception { + public void testPeerEnforcesHandleMaxOfZeroOnPipelinedOpenBeginAttach() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen(); peer.expectBegin(); + peer.expectAttach().ofReceiver(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -300,7 +332,8 @@ class SessionHandlingTest extends TestPeerTestsBase { client.expectAMQPHeader(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); - client.remoteBegin().onChannel(42).now(); + client.remoteBegin().now(); + client.remoteAttach().ofReceiver().withHandle(42).now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java similarity index 68% copy from protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java copy to protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java index caf3cb7..32b1a16 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.qpid.protonj2.test.driver; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -29,21 +30,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Tests for the test driver session handling from both client and server perspectives. + * Tests for the test driver remote sender handling from both client and server perspectives. */ @Timeout(20) -class SessionHandlingTest extends TestPeerTestsBase { +class SenderHandlingTest extends TestPeerTestsBase { - private static final Logger LOG = LoggerFactory.getLogger(SessionHandlingTest.class); + private static final Logger LOG = LoggerFactory.getLogger(SenderHandlingTest.class); @Test - public void testSessionTrackingWithClientOpensSession() throws Exception { + public void testSenderTrackingWithClientOpensSender() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); peer.expectBegin().onChannel(0).respond(); + peer.expectAttach().ofSender().withHandle(0).onChannel(0).respond(); peer.expectEnd().onChannel(0).respond(); peer.start(); @@ -53,10 +55,12 @@ class SessionHandlingTest extends TestPeerTestsBase { client.expectAMQPHeader(); client.expectOpen(); client.expectBegin().onChannel(0); + client.expectAttach().ofReceiver().onChannel(0).withHandle(0); client.expectEnd().onChannel(0); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); client.close(); @@ -68,14 +72,15 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testSessionBeginResponseUsesScriptedChannel() throws Exception { + public void testAttachResponseUsesScriptedChannel() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond().onChannel(42); - peer.expectEnd().onChannel(0).respond().onChannel(42); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.expectEnd().respond(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -85,11 +90,13 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().withRemoteChannel(0).onChannel(42); - client.expectEnd().onChannel(42); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectEnd(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -98,14 +105,15 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testWaitForCompletionFailsWhenRemoteSendEndOnWrongChannel() throws Exception { + public void testWaitForCompletionFailsWhenRemoteSendDetacgWithWrongHandle() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond().onChannel(42); - peer.expectEnd().onChannel(0).respond().onChannel(43); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.expectDetach().respond().withHandle(43); peer.start(); URI remoteURI = peer.getServerURI(); @@ -115,27 +123,31 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().withRemoteChannel(0).onChannel(42); - client.expectEnd().onChannel(42); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectDetach().withHandle(42); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); - client.remoteEnd().now(); + client.remoteAttach().ofSender().now(); + client.remoteDetach().now(); - assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(5, TimeUnit.SECONDS)); + assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(30, TimeUnit.SECONDS)); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @Test - public void testServerEndResponseFillsChannelsAutomaticallyIfNoneSpecified() throws Exception { + public void testServerDetachResponseFillsHandlesAutomaticallyIfNoneSpecified() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond().onChannel(42); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withHandle(42); + peer.expectDetach().respond(); peer.expectEnd().respond(); peer.start(); @@ -146,11 +158,15 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().withRemoteChannel(0).onChannel(42); - client.expectEnd().onChannel(42); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(42); + client.expectDetach().withHandle(42); + client.expectEnd(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + client.remoteDetach().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -159,13 +175,14 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testServerRespondToLastBeginFeature() throws Exception { + public void testServerRespondToLastAttachFeature() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0); + peer.expectBegin().respond(); + peer.expectAttach().ofSender(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -175,22 +192,27 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); + client.expectBegin(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectBegin().withRemoteChannel(0).onChannel(42); + client.expectAttach().ofReceiver(); // Now we respond to the last begin we saw at the server side. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); peer.expectEnd().respond(); - peer.respondToLastBegin().onChannel(42).now(); + peer.respondToLastAttach().now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectEnd().onChannel(42); + client.expectDetach(); + client.expectEnd(); + client.remoteDetach().now(); client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -199,18 +221,20 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testOpenAndCloseMultipleSessionsWithAutoChannelHandlingExpected() throws Exception { + public void testOpenAndCloseMultipleLinksWithAutoChannelHandlingExpected() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen().respond(); - peer.expectBegin().onChannel(0).respond(); - peer.expectBegin().onChannel(1).respond(); - peer.expectBegin().onChannel(2).respond(); - peer.expectEnd().onChannel(2).respond(); - peer.expectEnd().onChannel(1).respond(); - peer.expectEnd().onChannel(0).respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().withHandle(0).respond(); + peer.expectAttach().ofSender().withHandle(1).respond(); + peer.expectAttach().ofSender().withHandle(2).respond(); + peer.expectDetach().withHandle(2).respond(); + peer.expectDetach().withHandle(1).respond(); + peer.expectDetach().withHandle(0).respond(); + peer.expectEnd().respond(); peer.expectClose().respond(); peer.start(); @@ -220,24 +244,28 @@ class SessionHandlingTest extends TestPeerTestsBase { client.expectAMQPHeader(); client.expectOpen(); - client.expectBegin().onChannel(0); - client.expectBegin().onChannel(1); - client.expectBegin().onChannel(2); + client.expectBegin(); + client.expectAttach().ofReceiver().withHandle(0); + client.expectAttach().ofReceiver().withHandle(1); + client.expectAttach().ofReceiver().withHandle(2); client.connect(remoteURI.getHost(), remoteURI.getPort()); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); client.remoteBegin().now(); - client.remoteBegin().now(); - client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + client.remoteAttach().ofSender().now(); + client.remoteAttach().ofSender().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectEnd().onChannel(2); - client.expectEnd().onChannel(1); - client.expectEnd().onChannel(0); - - client.remoteEnd().onChannel(2).now(); - client.remoteEnd().onChannel(1).now(); - client.remoteEnd().onChannel(0).now(); + client.expectDetach().withHandle(2); + client.expectDetach().withHandle(1); + client.expectDetach().withHandle(0); + client.expectEnd(); + + client.remoteDetach().withHandle(2).now(); + client.remoteDetach().withHandle(1).now(); + client.remoteDetach().withHandle(0).now(); + client.remoteEnd().now(); client.waitForScriptToComplete(5, TimeUnit.SECONDS); client.expectClose(); @@ -249,13 +277,14 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testPeerEndsConnectionIfRemoteRespondsWithToHighChannelValue() throws Exception { + public void testPeerEndsConnectionIfRemoteRespondsWithToHighHandleValue() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); - peer.expectOpen().withChannelMax(0).respond(); - peer.expectBegin(); + peer.expectOpen().respond(); + peer.expectBegin().withHandleMax(0).respond(); + peer.expectAttach().ofSender(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -265,17 +294,19 @@ class SessionHandlingTest extends TestPeerTestsBase { client.connect(remoteURI.getHost(), remoteURI.getPort()); client.expectAMQPHeader(); client.expectOpen(); + client.expectBegin(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); - client.remoteOpen().withChannelMax(0).now(); - client.remoteBegin().now(); + client.remoteOpen().now(); + client.remoteBegin().withHandleMax(0).now(); + client.remoteAttach().ofSender().now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); - client.expectBegin(); + client.expectAttach().ofReceiver(); - // Now we respond to the last begin we saw at the server side. + // Now we respond to the last attach we saw at the server side. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.respondToLastBegin().onChannel(42).now(); + peer.respondToLastAttach().withHandle(42).now(); assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(5, TimeUnit.SECONDS)); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -283,13 +314,14 @@ class SessionHandlingTest extends TestPeerTestsBase { } @Test - public void testPeerEnforcesChannelMaxOfZeroOnPipelinedOpenBegin() throws Exception { + public void testPeerEnforcesHandleMaxOfZeroOnPipelinedOpenBeginAttach() throws Exception { try (ProtonTestServer peer = new ProtonTestServer(); ProtonTestClient client = new ProtonTestClient()) { peer.expectAMQPHeader().respondWithAMQPHeader(); peer.expectOpen(); peer.expectBegin(); + peer.expectAttach().ofSender(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -300,7 +332,8 @@ class SessionHandlingTest extends TestPeerTestsBase { client.expectAMQPHeader(); client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); client.remoteOpen().now(); - client.remoteBegin().onChannel(42).now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().withHandle(42).now(); // Wait for the above and then script next steps client.waitForScriptToComplete(5, TimeUnit.SECONDS); diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java index caf3cb7..417ead2 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java @@ -122,7 +122,7 @@ class SessionHandlingTest extends TestPeerTestsBase { client.remoteBegin().now(); client.remoteEnd().now(); - assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(5, TimeUnit.SECONDS)); + assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(30, TimeUnit.SECONDS)); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org