Repository: mina Updated Branches: refs/heads/trunk 5227b9265 -> d61364885
Messages sent on SSL sessions will generate a single sent event when the last message SSL record is sent. Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/d6136488 Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/d6136488 Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/d6136488 Branch: refs/heads/trunk Commit: d61364885799138614359d600c0b932f7b0f6a8d Parents: 5227b92 Author: Jeff MAURY <[email protected]> Authored: Sun Dec 14 22:23:56 2014 +0100 Committer: Jeff MAURY <[email protected]> Committed: Sun Dec 14 22:23:56 2014 +0100 ---------------------------------------------------------------------- .../mina/session/DefaultWriteRequest.java | 21 +++++++++++++++ .../org/apache/mina/session/WriteRequest.java | 7 +++++ .../mina/transport/bio/BioUdpSession.java | 2 +- .../mina/transport/nio/AbstractNioSession.java | 4 +-- .../apache/mina/transport/nio/SslHelper.java | 4 +-- .../org/apache/mina/transport/nio/SslTest.java | 27 ++++++++++++++++++++ 6 files changed, 60 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/d6136488/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java b/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java index 2e0b5a8..39bf895 100644 --- a/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java +++ b/core/src/main/java/org/apache/mina/session/DefaultWriteRequest.java @@ -45,6 +45,8 @@ public class DefaultWriteRequest implements WriteRequest { */ private boolean secureInternal = false; + private boolean confirmRequested = true; + /** * Creates a new instance of a WriteRequest, storing the message as it was * when the IoSession.write() has been called. @@ -57,6 +59,20 @@ public class DefaultWriteRequest implements WriteRequest { } /** + * Creates a new instance of a WriteRequest, storing the message as it was + * when the IoSession.write() has been called. + * + * @param message The message to write + * @param originalMessage the original message + * @param confirmRequested whether to send an event or not + */ + public DefaultWriteRequest(Object message, Object originalMessage, boolean confirmRequested) { + this.message = message; + this.originalMessage = originalMessage; + this.confirmRequested = confirmRequested; + } + + /** * {@inheritDoc} */ @Override @@ -158,4 +174,9 @@ public class DefaultWriteRequest implements WriteRequest { public void setSecureInternal(boolean secureInternal) { this.secureInternal = secureInternal; } + + @Override + public boolean isConfirmRequested() { + return confirmRequested; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina/blob/d6136488/core/src/main/java/org/apache/mina/session/WriteRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/session/WriteRequest.java b/core/src/main/java/org/apache/mina/session/WriteRequest.java index cb43354..c054482 100644 --- a/core/src/main/java/org/apache/mina/session/WriteRequest.java +++ b/core/src/main/java/org/apache/mina/session/WriteRequest.java @@ -84,4 +84,11 @@ public interface WriteRequest { * @param secureInternal the secure internal flag */ void setSecureInternal(boolean secureInternal); + + /** + * When this message has been set, should we send a corresponding send event + * or not. + * @return the send confirm flag + */ + boolean isConfirmRequested(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mina/blob/d6136488/core/src/main/java/org/apache/mina/transport/bio/BioUdpSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/bio/BioUdpSession.java b/core/src/main/java/org/apache/mina/transport/bio/BioUdpSession.java index a2ec97c..83f91c2 100644 --- a/core/src/main/java/org/apache/mina/transport/bio/BioUdpSession.java +++ b/core/src/main/java/org/apache/mina/transport/bio/BioUdpSession.java @@ -157,7 +157,7 @@ public class BioUdpSession extends AbstractIoSession { final Object highLevel = ((DefaultWriteRequest) writeRequest).getOriginalMessage(); - if (highLevel != null) { + if ((highLevel != null) && writeRequest.isConfirmRequested()) { processMessageSent(highLevel); } http://git-wip-us.apache.org/repos/asf/mina/blob/d6136488/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java b/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java index 41a97b0..69b0250 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java +++ b/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java @@ -231,7 +231,7 @@ public abstract class AbstractNioSession extends AbstractIoSession { final Object highLevel = ((DefaultWriteRequest) writeRequest).getOriginalMessage(); - if (highLevel != null) { + if ((highLevel != null) && writeRequest.isConfirmRequested()) { processMessageSent(highLevel); } } @@ -329,7 +329,7 @@ public abstract class AbstractNioSession extends AbstractIoSession { // generate the message sent event final Object highLevel = ((DefaultWriteRequest) writeRequest).getOriginalMessage(); - if (highLevel != null) { + if ((highLevel != null) && writeRequest.isConfirmRequested()) { processMessageSent(highLevel); } } else { http://git-wip-us.apache.org/repos/asf/mina/blob/d6136488/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java b/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java index 83cce1b..aff0beb 100644 --- a/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java +++ b/core/src/main/java/org/apache/mina/transport/nio/SslHelper.java @@ -420,9 +420,9 @@ public class SslHelper { appBuffer.flip(); done = buf.remaining() == 0; if (done) { - request = new DefaultWriteRequest(appBuffer); + request = new DefaultWriteRequest(appBuffer, buf, done); } else { - writeQueue.offer(new DefaultWriteRequest(appBuffer)); + writeQueue.offer(new DefaultWriteRequest(appBuffer, buf, done)); appBuffer = ByteBuffer.allocateDirect(appBuffer.capacity()); } break; http://git-wip-us.apache.org/repos/asf/mina/blob/d6136488/core/src/test/java/org/apache/mina/transport/nio/SslTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/mina/transport/nio/SslTest.java b/core/src/test/java/org/apache/mina/transport/nio/SslTest.java index a376ad5..0c9d8f2 100644 --- a/core/src/test/java/org/apache/mina/transport/nio/SslTest.java +++ b/core/src/test/java/org/apache/mina/transport/nio/SslTest.java @@ -473,4 +473,31 @@ public class SslTest { public void test1MMessageWithMINAClientBeforeHandshake() throws IOException, GeneralSecurityException, InterruptedException { testMessage(1024 * 1024, Client.MINA_BEFORE_HANDSHAKE); } + + @Test + public void checkThatASingleMessageSentEventIsSent() throws IOException, GeneralSecurityException, InterruptedException { + final CountDownLatch counter = new CountDownLatch(1); + final byte[] message = new byte[1024 * 1024]; + new Random().nextBytes(message); + final AtomicInteger sentCounter = new AtomicInteger(); + + NioTcpServer server = createReceivingServer(1024 * 1024, counter, null); + NioTcpClient client = new NioTcpClient(); + client.getSessionConfig().setSslContext(createSSLContext()); + client.setIoHandler(new AbstractIoHandler() { + + @Override + public void handshakeCompleted(IoSession session) { + session.write(ByteBuffer.wrap(message)); + } + + @Override + public void messageSent(IoSession session, Object message) { + sentCounter.incrementAndGet(); + } + }); + client.connect(new InetSocketAddress(server.getServerSocketChannel().socket().getLocalPort())); + assertTrue(counter.await(10, TimeUnit.SECONDS)); + assertEquals(5, sentCounter.get()); + } }
