[ 
https://issues.apache.org/jira/browse/DIRMINA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375884#comment-16375884
 ] 

Jonathan Valliere commented on DIRMINA-1076:
--------------------------------------------

*Why Patch 2.0 doesn't fully work*

Seems like {{Patch 2.0}} had a problem where {{scheduleRemove(Session)}} would 
be called concurrently and have it added to the {{removingSessions}} queue 
twice despite the check for {{contains(Session)}} because both threads could 
check for contains(Session) and return a false-negative which caused the 
{{Session}} to be added twice.  {{Queue}} lacks atomic operations like 
{{putIfAbsent}} which makes it harder to solve the problem. 

*Option 1*

One option would be to {{synchronize}} the entire code-block; however, I felt 
that it wasn't the safest option available because {{scheduleRemove(Session)}} 
could still be called by an external actor after the {{Processor}} has executed 
{{removeSessions()}} but before the {{for(...)}} loop cycles causing the 
{{Session}} to be removed twice again even if {{selector.keys()}} was used as a 
reference point.  The sequential operations would not be concurrent relative to 
each other unless {{synchronize}} blocks were added all over the place.

*Option 2*

The safest option to avoid this is to create a new {{queue}} which contains the 
active sessions only.  The new queue serves as an atomic reference of whether a 
{{Session}} was already removed or not.

*Option 2, Liability*

The new {{Queue}} could become a memory leak if {{Sessions}} are added or 
removed in some way without calling {{addNow(Session)}} or 
{{removeNow(Session)}}.

*Patch 3.0 (Option 2) - Passes Unit Tests*
{code:java}
diff --git 
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
index 79885fa..27e7d76 100644
--- 
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
@@ -23,6 +23,7 @@
 import java.net.PortUnreachableException;
 import java.nio.channels.ClosedSelectorException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
@@ -86,6 +87,9 @@
 
     /** A Session queue containing the newly created sessions */
     private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
+    
+    /** A queue used to store all active sessions */
+    private final Queue<S> activeSessions = new ConcurrentLinkedQueue<>();
 
     /** A queue used to store the sessions to be removed */
     private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
@@ -239,7 +243,9 @@
      *
      * @return {@link Iterator} of {@link IoSession}
      */
-    protected abstract Iterator<S> allSessions();
+    protected Iterator<S> allSessions() {
+        return Collections.unmodifiableCollection(activeSessions).iterator();
+    }
 
     /**
      * Get an {@link Iterator} for the list of {@link IoSession} found selected
@@ -411,7 +417,7 @@
     }
 
     private void scheduleRemove(S session) {
-        if (!removingSessions.contains(session)) {
+        if (!removingSessions.contains(session) && 
activeSessions.contains(session)) {
             removingSessions.add(session);
         }
     }
@@ -659,9 +665,20 @@
                     long currentTime = System.currentTimeMillis();
                     flush(currentTime);
 
+                    // Disconnect all sessions immediately if disposal has been
+                    // requested so that we exit this loop eventually.
+                    if (isDisposing()) {
+                        for (Iterator<S> i = allSessions(); i.hasNext();) {
+                            IoSession session = i.next();
+                            scheduleRemove((S) session);
+                        }
+                    }
+                    
                     // And manage removed sessions
                     nSessions -= removeSessions();
-
+                    
+                    assert nSessions > -1 : "Internal Session Count is 
Negative";
+                    
                     // Last, not least, send Idle events to the idle sessions
                     notifyIdleSessions(currentTime);
 
@@ -685,26 +702,6 @@
                         }
 
                         assert processorRef.get() == this;
-                    }
-
-                    // Disconnect all sessions immediately if disposal has been
-                    // requested so that we exit this loop eventually.
-                    if (isDisposing()) {
-                        boolean hasKeys = false;
-
-                        for (Iterator<S> i = allSessions(); i.hasNext();) {
-                            IoSession session = i.next();
-
-                            scheduleRemove((S) session);
-
-                            if (session.isActive()) {
-                                hasKeys = true;
-                            }
-                        }
-
-                        if (hasKeys) {
-                            wakeup();
-                        }
                     }
                 } catch (ClosedSelectorException cse) {
                     // If the selector has been closed, we can exit the loop
@@ -819,30 +816,41 @@
         private boolean addNow(S session) {
             boolean registered = false;
 
-            try {
-                init(session);
-                registered = true;
+        try {
+        if (activeSessions.contains(session)) {
+            return true;
+        }
 
-                // Build the filter chain of this session.
-                IoFilterChainBuilder chainBuilder = 
session.getService().getFilterChainBuilder();
-                chainBuilder.buildFilterChain(session.getFilterChain());
+        if (activeSessions.add(session)) {
+            init(session);
+            registered = true;
 
-                // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
-                // in AbstractIoFilterChain.fireSessionOpened().
-                // Propagate the SESSION_CREATED event up to the chain
-                IoServiceListenerSupport listeners = ((AbstractIoService) 
session.getService()).getListeners();
-                listeners.fireSessionCreated(session);
-            } catch (Exception e) {
-                ExceptionMonitor.getInstance().exceptionCaught(e);
+            // Build the filter chain of this session.
+            IoFilterChainBuilder chainBuilder = session.getService()
+                .getFilterChainBuilder();
+            chainBuilder.buildFilterChain(session.getFilterChain());
 
-                try {
-                    destroy(session);
-                } catch (Exception e1) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e1);
-                } finally {
-                    registered = false;
-                }
-            }
+            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside
+            // here
+            // in AbstractIoFilterChain.fireSessionOpened().
+            // Propagate the SESSION_CREATED event up to the chain
+            IoServiceListenerSupport listeners = ((AbstractIoService) session
+                .getService()).getListeners();
+            listeners.fireSessionCreated(session);
+        }
+        } catch (Exception e) {
+        ExceptionMonitor.getInstance().exceptionCaught(e);
+
+        if (activeSessions.remove(session)) {
+            try {
+            destroy(session);
+            } catch (Exception e1) {
+            ExceptionMonitor.getInstance().exceptionCaught(e1);
+            } finally {
+            registered = false;
+            }
+        }
+        }
 
             return registered;
         }
@@ -850,40 +858,36 @@
         private int removeSessions() {
             int removedSessions = 0;
 
-            for (S session = removingSessions.poll(); session != null; session 
= removingSessions.poll()) {
-                SessionState state = getState(session);
+        for (S session = removingSessions.poll(); session != null; session = 
removingSessions .poll()) {
+        SessionState state = getState(session);
 
-                // Now deal with the removal accordingly to the session's state
-                switch (state) {
-                case OPENED:
-                    // Try to remove this session
-                    if (removeNow(session)) {
-                        removedSessions++;
-                    }
+        // Now deal with the removal accordingly to the session's
+        // state
+        switch (state) {
+        case OPENED:
+        case CLOSING:
+            // Try to remove this session
+            if (removeNow(session)) {
+            removedSessions++;
+            }
 
-                    break;
+            break;
 
-                case CLOSING:
-                    // Skip if channel is already closed
-                    // In any case, remove the session from the queue
-                    removedSessions++;
-                    break;
+        case OPENING:
+            // Remove session from the newSessions queue and
+            // remove it
+            newSessions.remove(session);
 
-                case OPENING:
-                    // Remove session from the newSessions queue and
-                    // remove it
-                    newSessions.remove(session);
+            if (removeNow(session)) {
+            removedSessions++;
+            }
 
-                    if (removeNow(session)) {
-                        removedSessions++;
-                    }
+            break;
 
-                    break;
-
-                default:
-                    throw new IllegalStateException(String.valueOf(state));
-                }
-            }
+        default:
+            throw new IllegalStateException(String.valueOf(state));
+        }
+        }
 
             return removedSessions;
         }
@@ -1145,27 +1149,32 @@
         }
 
         private boolean removeNow(S session) {
-            clearWriteRequestQueue(session);
+        if (activeSessions.remove(session)) {
+        clearWriteRequestQueue(session);
 
-            try {
-                destroy(session);
-                return true;
-            } catch (Exception e) {
-                IoFilterChain filterChain = session.getFilterChain();
-                filterChain.fireExceptionCaught(e);
-            } finally {
-                try {
-                    clearWriteRequestQueue(session);
-                    ((AbstractIoService) 
session.getService()).getListeners().fireSessionDestroyed(session);
-                } catch (Exception e) {
-                    // The session was either destroyed or not at this point.
-                    // We do not want any exception thrown from this "cleanup" 
code
-                    // to change
-                    // the return value by bubbling up.
-                    IoFilterChain filterChain = session.getFilterChain();
-                    filterChain.fireExceptionCaught(e);
-                }
-            }
+        try {
+            destroy(session);
+            return true;
+        } catch (Exception e) {
+            IoFilterChain filterChain = session.getFilterChain();
+            filterChain.fireExceptionCaught(e);
+        } finally {
+            try {
+            clearWriteRequestQueue(session);
+            ((AbstractIoService) session.getService())
+                .getListeners().fireSessionDestroyed(session);
+            } catch (Exception e) {
+            // The session was either destroyed or not at this
+            // point.
+            // We do not want any exception thrown from this
+            // "cleanup" code
+            // to change
+            // the return value by bubbling up.
+            IoFilterChain filterChain = session.getFilterChain();
+            filterChain.fireExceptionCaught(e);
+            }
+        }
+        }
 
             return false;
         }
diff --git 
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
 
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
index 3b0fa40..fc60124 100644
--- 
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
@@ -149,15 +149,20 @@
         }
     }
 
+//    @Override
+//    protected Iterator<NioSession> allSessions() {
+//        selectorLock.readLock().lock();
+//        
+//        try {
+//            return new IoSessionIterator(selector.keys());
+//        } finally {
+//            selectorLock.readLock().unlock();
+//        }
+//    }
+    
     @Override
     protected Iterator<NioSession> allSessions() {
-        selectorLock.readLock().lock();
-        
-        try {
-            return new IoSessionIterator(selector.keys());
-        } finally {
-            selectorLock.readLock().unlock();
-        }
+        return super.allSessions();
     }
 
     @SuppressWarnings("synthetic-access")
@@ -182,14 +187,13 @@
     @Override
     protected void destroy(NioSession session) throws Exception {
         ByteChannel ch = session.getChannel();
-        
         SelectionKey key = session.getSelectionKey();
         
         if (key != null) {
             key.cancel();
         }
         
-        if ( ch.isOpen() ) {
+        if (ch.isOpen() ) {
             ch.close();
         }
     }
{code}

> Leaking NioProcessors/NioSocketConnectors hanging in call to dispose
> --------------------------------------------------------------------
>
>                 Key: DIRMINA-1076
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-1076
>             Project: MINA
>          Issue Type: Bug
>    Affects Versions: 2.0.16
>            Reporter: Christoph John
>            Assignee: Jonathan Valliere
>            Priority: Major
>         Attachments: mina-dispose-hang.txt, mina-test-log.txt, 
> mina-test-patch.txt
>
>
> Follow-up to mailing list discussion.
> I was now able to reproduce the problem with a MINA test. Or let's say I did 
> the brute-force approach by re-running one test in an endless loop.
> I have attached a patch of AbstractIoServiceTest (against 
> [https://github.com/apache/mina/tree/2.0]) and a stack trace. After a few 
> loops the test is stuck. You can see a lot of threads hanging in dispose() 
> and the test is stuck when it tries to dispose the acceptor.
>  
> What is a little strange is that the javadoc says that 
> connector.dispose(TRUE) should not be called from an IoFutureListener, but in 
> the test it is done anyway. However, changing the parameter to FALSE does not 
> help either.
>  
>  Is there anything that can be done to prevent this hang?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to