This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
commit 34170e03cc2f8ba807d22bdc7d2402d31c16b76a Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri May 28 13:41:15 2021 -0400 PROTON-2393 Allow AMQP Frames to be fired on connection established Allows for actions to be queued for fire on connect either from server accepting new client connection or client connecting successfully to a server. --- .../apache/qpid/protonj2/test/driver/AMQPTestDriver.java | 9 +++++++++ .../apache/qpid/protonj2/test/driver/ProtonTestClient.java | 14 +++++++++++++- .../qpid/protonj2/test/driver/ProtonTestConnector.java | 5 +++++ .../apache/qpid/protonj2/test/driver/ProtonTestPeer.java | 2 ++ .../apache/qpid/protonj2/test/driver/ProtonTestServer.java | 12 ++++++++++++ 5 files changed, 41 insertions(+), 1 deletion(-) diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java index bd95058..0540588 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java @@ -247,6 +247,15 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { //----- Test driver handling of decoded AMQP frames + void handleConnectedEstablished() throws AssertionError { + synchronized (script) { + ScriptedElement peekNext = script.peek(); + if (peekNext instanceof ScriptedAction) { + prcessScript(peekNext); + } + } + } + void handleHeader(AMQPHeader header) throws AssertionError { synchronized (script) { final ScriptedElement scriptEntry = script.poll(); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java index 9541f4b..324119f 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java @@ -79,6 +79,12 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable { } @Override + protected void processConnectionEstablished() { + LOG.trace("AMQP Client connected to remote."); + driver.handleConnectedEstablished(); + } + + @Override protected void processCloseRequest() { try { client.close(); @@ -89,7 +95,7 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable { @Override protected void processDriverOutput(ByteBuffer frame) { - LOG.trace("AMQP Server Channel writing: {}", frame); + LOG.trace("AMQP Client Channel writing: {}", frame); client.write(frame); } @@ -183,6 +189,12 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable { return new SimpleChannelInboundHandler<ByteBuf>() { @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + processConnectionEstablished(); + ctx.fireChannelActive(); + } + + @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception { LOG.trace("AMQP Test Client Channel read: {}", input); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java index c85dd50..5375fe3 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java @@ -72,4 +72,9 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte protected void processDriverOutput(ByteBuffer frame) { inputConsumer.accept(frame); } + + @Override + protected void processConnectionEstablished() { + driver.handleConnectedEstablished(); + } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java index 4a1ac4e..6dc4d1a 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java @@ -105,6 +105,8 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab protected abstract void processDriverOutput(ByteBuffer frame); + protected abstract void processConnectionEstablished(); + protected void checkClosed() { if (closed.get()) { throw new IllegalStateException("The test peer is closed"); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java index dfdfff0..61e42cc 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java @@ -131,6 +131,12 @@ public class ProtonTestServer extends ProtonTestPeer { return new SimpleChannelInboundHandler<ByteBuf>() { @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + processConnectionEstablished(); + ctx.fireChannelActive(); + } + + @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception { LOG.trace("AMQP Test Server Channel read: {}", input); @@ -231,6 +237,12 @@ public class ProtonTestServer extends ProtonTestPeer { server.write(frame); } + @Override + protected void processConnectionEstablished() { + LOG.trace("AMQP Server has a client connected."); + driver.handleConnectedEstablished(); + } + protected void processDriverAssertion(AssertionError error) { LOG.trace("AMQP Server Closing due to error: {}", error.getMessage()); close(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org