This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new eab91726 PROTON-2739 Better handle cases of scripted connect and
reconnect
eab91726 is described below
commit eab91726b7259f5779ed73ccba0b80e0903afc10
Author: Timothy Bish <[email protected]>
AuthorDate: Fri May 26 15:23:01 2023 -0400
PROTON-2739 Better handle cases of scripted connect and reconnect
Better handle cases of a peer connection that drops in certain scenarios
and then reconnects. Adds an expectation element to let the script await
the connection drop before proceeding with additional actions etc.
---
.../qpid/protonj2/test/driver/AMQPTestDriver.java | 36 +++++++++---
.../qpid/protonj2/test/driver/DriverSessions.java | 9 +++
.../qpid/protonj2/test/driver/ScriptWriter.java | 66 ++++++++++++----------
.../expectations/ConnectionDropExpectation.java | 38 +++++++++++++
.../expectations/SaslOutcomeExpectation.java | 28 +++++++++
.../protonj2/test/driver/ProtonTestClientTest.java | 51 +++++++++++++++++
6 files changed, 189 insertions(+), 39 deletions(-)
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index 9403e01d..a13d9c3a 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -36,6 +36,8 @@ import
org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat;
import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
import
org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType;
import
org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError;
+import
org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation;
+import
org.apache.qpid.protonj2.test.driver.expectations.SaslOutcomeExpectation;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -255,10 +257,20 @@ public class AMQPTestDriver implements
Consumer<ByteBuffer> {
void handleConnectedDropped() throws AssertionError {
synchronized (script) {
- // For now we just reset the parse as any new connection would
need to
+ // For now we just reset the parser as any new connection would
need to
// send an AMQP header, other validation could be added if we
expand
// processing on client disconnect events.
- frameParser.resetToExpectingHeader();
+ resetToExpectingAMQPHeader();
+ // Reset connection tracking state to empty as the connection is
gone
+ // and we want new connection to be able to reuse handles etc.
+ sessions.reset();
+ // Check if the currently pending scripted expectation is for the
connection
+ // to drop in which case we remove it and unblock any waiters on
the script
+ // to complete, if this is the final scripted entry.
+ final ScriptedElement scriptEntry = script.peek();
+ if (scriptEntry instanceof ConnectionDropExpectation) {
+ processScript(script.poll());
+ }
}
}
@@ -294,12 +306,6 @@ public class AMQPTestDriver implements
Consumer<ByteBuffer> {
}
try {
- // When the outcome of SASL is read the decoder should revert
to initial state
- // as the only valid next incoming value is an AMQP header.
- if (sasl instanceof SaslOutcome) {
- frameParser.resetToExpectingHeader();
- }
-
sasl.invoke(scriptEntry, frameSize, this);
} catch (UnexpectedPerformativeError e) {
if (scriptEntry.isOptional()) {
@@ -381,6 +387,18 @@ public class AMQPTestDriver implements
Consumer<ByteBuffer> {
//----- Test driver actions
+ /**
+ * Resets the frame parser to a state where it expects to read an AMQP
Header type instead
+ * of a normal AMQP frame type. By default the parser starts in this
state and then switches
+ * to frames after the first header is read. In cases where SASL is in
use this needs to be
+ * reset to the header read state when the outcome is known. Normally the
script should handle
+ * this via a {@link SaslOutcomeExpectation} but other script variations
might want to drive
+ * this manually.
+ */
+ public void resetToExpectingAMQPHeader() {
+ this.frameParser.resetToExpectingHeader();
+ }
+
/**
* Waits indefinitely for the scripted expectations and actions to be
performed. If the script
* execution encounters an error this method will throw an {@link
AssertionError} that describes
@@ -798,7 +816,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer>
{
}
private void processScript(ScriptedElement current) {
- while (current.performAfterwards() != null && failureCause == null) {
+ if (current.performAfterwards() != null && failureCause == null) {
current.performAfterwards().perform(this);
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
index 92a07beb..a9c3ef5a 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
@@ -71,6 +71,15 @@ public class DriverSessions {
return remoteSessions.get(remoteChannel);
}
+ public void reset() {
+ localSessions.clear();
+ remoteSessions.clear();
+
+ lastRemotelyOpenedSession = null;
+ lastLocallyOpenedSession = null;
+ lastCoordinator = null;
+ }
+
//----- Process performatives that require session level tracking
public SessionTracker handleBegin(Begin remoteBegin, UnsignedShort
remoteChannel) {
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
index 38cddeae..62ac6899 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
@@ -56,6 +56,7 @@ import
org.apache.qpid.protonj2.test.driver.expectations.AMQPHeaderExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.AttachExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.BeginExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.CloseExpectation;
+import
org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.DeclareExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.DetachExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.DischargeExpectation;
@@ -88,67 +89,73 @@ public abstract class ScriptWriter {
//----- AMQP Performative expectations
public AMQPHeaderExpectation expectAMQPHeader() {
- AMQPHeaderExpectation expecting = new
AMQPHeaderExpectation(AMQPHeader.getAMQPHeader(), getDriver());
+ final AMQPHeaderExpectation expecting = new
AMQPHeaderExpectation(AMQPHeader.getAMQPHeader(), getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public OpenExpectation expectOpen() {
- OpenExpectation expecting = new OpenExpectation(getDriver());
+ final OpenExpectation expecting = new OpenExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public CloseExpectation expectClose() {
- CloseExpectation expecting = new CloseExpectation(getDriver());
+ final CloseExpectation expecting = new CloseExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public BeginExpectation expectBegin() {
- BeginExpectation expecting = new BeginExpectation(getDriver());
+ final BeginExpectation expecting = new BeginExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public EndExpectation expectEnd() {
- EndExpectation expecting = new EndExpectation(getDriver());
+ final EndExpectation expecting = new EndExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public AttachExpectation expectAttach() {
- AttachExpectation expecting = new AttachExpectation(getDriver());
+ final AttachExpectation expecting = new AttachExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public DetachExpectation expectDetach() {
- DetachExpectation expecting = new DetachExpectation(getDriver());
+ final DetachExpectation expecting = new DetachExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public FlowExpectation expectFlow() {
- FlowExpectation expecting = new FlowExpectation(getDriver());
+ final FlowExpectation expecting = new FlowExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public TransferExpectation expectTransfer() {
- TransferExpectation expecting = new TransferExpectation(getDriver());
+ final TransferExpectation expecting = new
TransferExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public DispositionExpectation expectDisposition() {
- DispositionExpectation expecting = new
DispositionExpectation(getDriver());
+ final DispositionExpectation expecting = new
DispositionExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public EmptyFrameExpectation expectEmptyFrame() {
- EmptyFrameExpectation expecting = new
EmptyFrameExpectation(getDriver());
+ final EmptyFrameExpectation expecting = new
EmptyFrameExpectation(getDriver());
+ getDriver().addScriptedElement(expecting);
+ return expecting;
+ }
+
+ public ConnectionDropExpectation expectConnectionToDrop() {
+ final ConnectionDropExpectation expecting = new
ConnectionDropExpectation();
getDriver().addScriptedElement(expecting);
return expecting;
}
@@ -156,7 +163,7 @@ public abstract class ScriptWriter {
//----- Transaction expectations
public AttachExpectation expectCoordinatorAttach() {
- AttachExpectation expecting = new AttachExpectation(getDriver());
+ final AttachExpectation expecting = new AttachExpectation(getDriver());
expecting.withRole(Role.SENDER);
expecting.withCoordinator(isA(Coordinator.class));
@@ -167,7 +174,7 @@ public abstract class ScriptWriter {
}
public DeclareExpectation expectDeclare() {
- DeclareExpectation expecting = new DeclareExpectation(getDriver());
+ final DeclareExpectation expecting = new
DeclareExpectation(getDriver());
expecting.withHandle(notNullValue());
expecting.withDeliveryId(notNullValue());
@@ -179,7 +186,7 @@ public abstract class ScriptWriter {
}
public DischargeExpectation expectDischarge() {
- DischargeExpectation expecting = new DischargeExpectation(getDriver());
+ final DischargeExpectation expecting = new
DischargeExpectation(getDriver());
expecting.withHandle(notNullValue());
expecting.withDeliveryId(notNullValue());
@@ -193,37 +200,37 @@ public abstract class ScriptWriter {
//----- SASL performative expectations
public AMQPHeaderExpectation expectSASLHeader() {
- AMQPHeaderExpectation expecting = new
AMQPHeaderExpectation(AMQPHeader.getSASLHeader(), getDriver());
+ final AMQPHeaderExpectation expecting = new
AMQPHeaderExpectation(AMQPHeader.getSASLHeader(), getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public SaslMechanismsExpectation expectSaslMechanisms() {
- SaslMechanismsExpectation expecting = new
SaslMechanismsExpectation(getDriver());
+ final SaslMechanismsExpectation expecting = new
SaslMechanismsExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public SaslInitExpectation expectSaslInit() {
- SaslInitExpectation expecting = new SaslInitExpectation(getDriver());
+ final SaslInitExpectation expecting = new
SaslInitExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public SaslChallengeExpectation expectSaslChallenge() {
- SaslChallengeExpectation expecting = new
SaslChallengeExpectation(getDriver());
+ final SaslChallengeExpectation expecting = new
SaslChallengeExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public SaslResponseExpectation expectSaslResponse() {
- SaslResponseExpectation expecting = new
SaslResponseExpectation(getDriver());
+ final SaslResponseExpectation expecting = new
SaslResponseExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
public SaslOutcomeExpectation expectSaslOutcome() {
- SaslOutcomeExpectation expecting = new
SaslOutcomeExpectation(getDriver());
+ final SaslOutcomeExpectation expecting = new
SaslOutcomeExpectation(getDriver());
getDriver().addScriptedElement(expecting);
return expecting;
}
@@ -628,9 +635,9 @@ public abstract class ScriptWriter {
//----- Utility methods for tests writing raw scripted SASL tests
public byte[] saslPlainInitialResponse(String username, String password) {
- byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
- byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
- byte[] initialResponse = new
byte[usernameBytes.length+passwordBytes.length+2];
+ final byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+ final byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+ final byte[] initialResponse = new
byte[usernameBytes.length+passwordBytes.length+2];
System.arraycopy(usernameBytes, 0, initialResponse, 1,
usernameBytes.length);
System.arraycopy(passwordBytes, 0, initialResponse, 2 +
usernameBytes.length, passwordBytes.length);
@@ -638,9 +645,9 @@ public abstract class ScriptWriter {
}
public byte[] saslXOauth2InitialResponse(String username, String password)
{
- byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
- byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
- byte[] initialResponse = new
byte[usernameBytes.length+passwordBytes.length+20];
+ final byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+ final byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+ final byte[] initialResponse = new
byte[usernameBytes.length+passwordBytes.length+20];
System.arraycopy("user=".getBytes(StandardCharsets.US_ASCII), 0,
initialResponse, 0, 5);
System.arraycopy(usernameBytes, 0, initialResponse, 5,
usernameBytes.length);
@@ -665,9 +672,9 @@ public abstract class ScriptWriter {
* @throws IllegalStateException if no Begin has yet been received from
the remote.
*/
public BeginInjectAction respondToLastBegin() {
- BeginInjectAction response = new BeginInjectAction(getDriver());
+ final BeginInjectAction response = new BeginInjectAction(getDriver());
- SessionTracker session =
getDriver().sessions().getLastRemotelyOpenedSession();
+ final SessionTracker session =
getDriver().sessions().getLastRemotelyOpenedSession();
if (session == null) {
throw new IllegalStateException("Cannot create response to Begin
before one has been received.");
}
@@ -688,8 +695,7 @@ public abstract class ScriptWriter {
* @throws IllegalStateException if no Attach has yet been received from
the remote.
*/
public AttachInjectAction respondToLastAttach() {
- AttachInjectAction response = new AttachInjectAction(getDriver());
-
+ final AttachInjectAction response = new
AttachInjectAction(getDriver());
final SessionTracker session =
getDriver().sessions().getLastRemotelyOpenedSession();
final LinkTracker link = session.getLastRemotelyOpenedLink();
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java
new file mode 100644
index 00000000..d57f26b0
--- /dev/null
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver.expectations;
+
+import org.apache.qpid.protonj2.test.driver.ScriptedExpectation;
+
+/**
+ * Expectation used to script an expected connection drop from the remotely
+ * connected peer.
+ * <p>
+ * This expectation type is best used to await a remote peer response of
+ * dropping the connection in relation to some scripted action that will
+ * result in it closing its side of the connection.
+ */
+public class ConnectionDropExpectation implements ScriptedExpectation {
+
+ /**
+ * Creates a simple connection dropped expectation instance.
+ */
+ public ConnectionDropExpectation() {
+ // No setup needed as the driver should handle this explicitly.
+ }
+}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
index 36c81e8d..eecf87e2 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
@@ -19,6 +19,7 @@ package org.apache.qpid.protonj2.test.driver.expectations;
import static org.hamcrest.CoreMatchers.equalTo;
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
+import org.apache.qpid.protonj2.test.driver.ScriptedAction;
import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte;
@@ -38,6 +39,33 @@ public class SaslOutcomeExpectation extends
AbstractExpectation<SaslOutcome> {
super(driver);
}
+ @Override
+ public ScriptedAction performAfterwards() {
+ return new ScriptedAction() {
+
+ @Override
+ public ScriptedAction queue() {
+ throw new UnsupportedOperationException("Cannot be called on
this action");
+ }
+
+ @Override
+ public ScriptedAction perform(AMQPTestDriver driver) {
+ driver.resetToExpectingAMQPHeader();
+ return null;
+ }
+
+ @Override
+ public ScriptedAction now() {
+ throw new UnsupportedOperationException("Cannot be called on
this action");
+ }
+
+ @Override
+ public ScriptedAction later(int waitTime) {
+ throw new UnsupportedOperationException("Cannot be called on
this action");
+ }
+ };
+ }
+
//----- Type specific with methods that perform simple equals checks
public SaslOutcomeExpectation withCode(byte code) {
diff --git
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
index 2cc9caa2..c671b594 100644
---
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
+++
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
@@ -55,6 +55,8 @@ class ProtonTestClientTest extends TestPeerTestsBase {
LOG.info("Test started, peer listening on: {}", remoteURI);
+ Thread.sleep(100);
+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@@ -90,6 +92,55 @@ class ProtonTestClientTest extends TestPeerTestsBase {
}
}
+ @Test
+ public void testTwoClientConnectionsHandlesOpenBeginAttach() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().respond();
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ // Server can accept two connection, although not at the same time.
+
+ try (ProtonTestClient client = new ProtonTestClient()) {
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach();
+ client.dropAfterLastHandler(10);
+ client.remoteAMQPHeader().now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ try (ProtonTestClient client = new ProtonTestClient()) {
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach();
+ client.remoteAMQPHeader().now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
@Test
public void testClientDetectsUnexpectedPerformativeResponseToAMQPHeader()
throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]