This is an automated email from the ASF dual-hosted git repository. johnnyv pushed a commit to branch DIRMINA-1107 in repository https://gitbox.apache.org/repos/asf/mina.git
commit 99fbf4fe64d72409b19f8835c0a568022916d840 Author: johnnyv <[email protected]> AuthorDate: Sat May 18 18:30:01 2019 -0400 DIRMINA-1107 is caused from a memory inconsistency in how CAS operations are checked from within the SslHandler#flushScheduledEvents. This CAS operation can be updated to correctly check for X > 0 but still leaves situations where the CAS was incremented between the CAS loop exits and the SslHandler#sslLock lock is released resulting in a false tryLock() == false causing one or more message to be queued but never flushed until another message is queued and tryLock() succeeds to push it out. Changing tryLock() to a full lock() creates a situation where applications experience full mutual exclusion of read and write operations causing significant performance problems. This patch removes SslHandler#sslLock and SslHandler#scheduledEvents atomic. The function SslHandler#flushScheduledEvents is broken into two new methods flushMessageReceived and flushFilterWrite. SslFilter was updated to so that flushFilterWrite occurs within the SslFilter mutex to ensure concurrency. The other method flushMessageReceived MUST occur outside of the SslFilter mutex to prevent deadlocks. As usual, never put an ExecutorFilter before the SslFilter. --- .../java/org/apache/mina/filter/ssl/SslFilter.java | 125 ++++++++++----------- .../org/apache/mina/filter/ssl/SslHandler.java | 50 ++++----- .../org/apache/mina/filter/ssl/SslFilterTest.java | 13 ++- 3 files changed, 89 insertions(+), 99 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java index a8e9e31..45d124e 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java @@ -223,9 +223,9 @@ public class SslFilter extends IoFilterAdapter { } else { started = false; } + sslHandler.flushFilterWrite(); } - - sslHandler.flushScheduledEvents(); + sslHandler.flushMessageReceived(); } catch (SSLException se) { sslHandler.release(); throw se; @@ -322,9 +322,8 @@ public class SslFilter extends IoFilterAdapter { try { synchronized (sslHandler) { future = initiateClosure(nextFilter, session); + sslHandler.flushFilterWrite(); } - - sslHandler.flushScheduledEvents(); } catch (SSLException se) { sslHandler.release(); throw se; @@ -499,58 +498,58 @@ public class SslFilter extends IoFilterAdapter { SslHandler sslHandler = getSslSessionHandler(session); - synchronized (sslHandler) { - if (!isSslStarted(session) && sslHandler.isInboundDone()) { - // The SSL session must be established first before we - // can push data to the application. Store the incoming - // data into a queue for a later processing - sslHandler.scheduleMessageReceived(nextFilter, message); - } else { - IoBuffer buf = (IoBuffer) message; - - try { - if (sslHandler.isOutboundDone()) { - sslHandler.destroy(); - throw new SSLException("Outbound done"); - } - - // forward read encrypted data to SSL handler - sslHandler.messageReceived(nextFilter, buf.buf()); - - // Handle data to be forwarded to application or written to net - handleSslData(nextFilter, sslHandler); - - if (sslHandler.isInboundDone()) { - if (sslHandler.isOutboundDone()) { - sslHandler.destroy(); - } else { - initiateClosure(nextFilter, session); - } - - if (buf.hasRemaining()) { - // Forward the data received after closure. - sslHandler.scheduleMessageReceived(nextFilter, buf); - } - } - } catch (SSLException ssle) { - if (!sslHandler.isHandshakeComplete()) { - SSLException newSsle = new SSLHandshakeException("SSL handshake failed."); - newSsle.initCause(ssle); - ssle = newSsle; - - // Close the session immediately, the handshake has failed - session.closeNow(); - } else { - // Free the SSL Handler buffers - sslHandler.release(); - } - - throw ssle; - } - } - } - - sslHandler.flushScheduledEvents(); + synchronized (sslHandler) { + if (!isSslStarted(session) && sslHandler.isInboundDone()) { + // The SSL session must be established first before we + // can push data to the application. Store the incoming + // data into a queue for a later processing + sslHandler.scheduleMessageReceived(nextFilter, message); + } else { + IoBuffer buf = (IoBuffer) message; + + try { + if (sslHandler.isOutboundDone()) { + sslHandler.destroy(); + throw new SSLException("Outbound done"); + } + + // forward read encrypted data to SSL handler + sslHandler.messageReceived(nextFilter, buf.buf()); + + // Handle data to be forwarded to application or written to net + handleSslData(nextFilter, sslHandler); + + if (sslHandler.isInboundDone()) { + if (sslHandler.isOutboundDone()) { + sslHandler.destroy(); + } else { + initiateClosure(nextFilter, session); + } + + if (buf.hasRemaining()) { + // Forward the data received after closure. + sslHandler.scheduleMessageReceived(nextFilter, buf); + } + } + } catch (SSLException ssle) { + if (!sslHandler.isHandshakeComplete()) { + SSLException newSsle = new SSLHandshakeException("SSL handshake failed."); + newSsle.initCause(ssle); + ssle = newSsle; + + // Close the session immediately, the handshake has failed + session.closeNow(); + } else { + // Free the SSL Handler buffers + sslHandler.release(); + } + + throw ssle; + } + } + } + + sslHandler.flushMessageReceived(); } @Override @@ -665,10 +664,9 @@ public class SslFilter extends IoFilterAdapter { needsFlush = false; } } - } - - if (needsFlush) { - sslHandler.flushScheduledEvents(); + if (needsFlush) { + sslHandler.flushFilterWrite(); + } } } catch (SSLException se) { sslHandler.release(); @@ -700,9 +698,8 @@ public class SslFilter extends IoFilterAdapter { } }); } + sslHandler.flushFilterWrite(); } - - sslHandler.flushScheduledEvents(); } catch (SSLException se) { sslHandler.release(); throw se; @@ -746,9 +743,9 @@ public class SslFilter extends IoFilterAdapter { try { synchronized (sslHandler) { sslHandler.handshake(nextFilter); + sslHandler.flushFilterWrite(); } - - sslHandler.flushScheduledEvents(); + sslHandler.flushMessageReceived(); } catch (SSLException se) { sslHandler.release(); throw se; diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java index 3da0fe4..1870da6 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java +++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java @@ -118,12 +118,6 @@ class SslHandler { * for data being produced during the handshake). */ private boolean writingEncryptedData; - /** A lock to protect the SSL flush of events */ - private ReentrantLock sslLock = new ReentrantLock(); - - /** A counter of schedules events */ - private final AtomicInteger scheduledEvents = new AtomicInteger(0); - /** * Create a new SSL Handler, and initialize it. * @@ -305,6 +299,18 @@ class SslHandler { filterWriteEventQueue.add(new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest)); } + /* no qualifier */void flushFilterWrite() { + // Fire events only when the lock is available for this handler. + IoFilterEvent event; + + // We need synchronization here inevitably because filterWrite can be + // called simultaneously and cause 'bad record MAC' integrity error. + while ((event = filterWriteEventQueue.poll()) != null) { + NextFilter nextFilter = event.getNextFilter(); + nextFilter.filterWrite(session, (WriteRequest) event.getParameter()); + } + } + /** * Push the newly received data into a queue, waiting for the SSL session * to be fully established @@ -315,32 +321,14 @@ class SslHandler { /* no qualifier */void scheduleMessageReceived(NextFilter nextFilter, Object message) { messageReceivedEventQueue.add(new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message)); } + + /* no qualifier */void flushMessageReceived() { + IoFilterEvent event; - /* no qualifier */void flushScheduledEvents() { - scheduledEvents.incrementAndGet(); - - // Fire events only when the lock is available for this handler. - if (sslLock.tryLock()) { - IoFilterEvent event; - - try { - do { - // We need synchronization here inevitably because filterWrite can be - // called simultaneously and cause 'bad record MAC' integrity error. - while ((event = filterWriteEventQueue.poll()) != null) { - NextFilter nextFilter = event.getNextFilter(); - nextFilter.filterWrite(session, (WriteRequest) event.getParameter()); - } - - while ((event = messageReceivedEventQueue.poll()) != null) { - NextFilter nextFilter = event.getNextFilter(); - nextFilter.messageReceived(session, event.getParameter()); - } - } while (scheduledEvents.decrementAndGet() > 0); - } finally { - sslLock.unlock(); - } - } + while ((event = messageReceivedEventQueue.poll()) != null) { + NextFilter nextFilter = event.getNextFilter(); + nextFilter.messageReceived(session, event.getParameter()); + } } /** diff --git a/mina-core/src/test/java/org/apache/mina/filter/ssl/SslFilterTest.java b/mina-core/src/test/java/org/apache/mina/filter/ssl/SslFilterTest.java index 550f3c9..5838e3c 100644 --- a/mina-core/src/test/java/org/apache/mina/filter/ssl/SslFilterTest.java +++ b/mina-core/src/test/java/org/apache/mina/filter/ssl/SslFilterTest.java @@ -114,8 +114,10 @@ public class SslFilterTest { // filterWriteEventQueue. Future<?> write_scheduler = executor.submit(new Runnable() { public void run() { - test_class.scheduleFilterWrite(write_filter, new DefaultWriteRequest(new byte[] {})); - test_class.flushScheduledEvents(); + synchronized(test_class) { + test_class.scheduleFilterWrite(write_filter, new DefaultWriteRequest(new byte[] {})); + test_class.flushFilterWrite(); + } } }); @@ -128,8 +130,11 @@ public class SslFilterTest { public void filterWrite(IoSession session, WriteRequest writeRequest) { } }; - test_class.scheduleMessageReceived(receive_filter, new byte[] {}); - test_class.flushScheduledEvents(); + synchronized(test_class) { + test_class.scheduleMessageReceived(receive_filter, new byte[] {}); + } + + test_class.flushMessageReceived(); assertEquals(1, message_received_messages.size()); assertEquals(1, filter_write_requests.size());
