Repository: flink
Updated Branches:
  refs/heads/master 081a7ddda -> fab1bd9dc


[FLINK-1432] [runtime] Make memory segment release robust against concurrent 
modifications of the collection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab1bd9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab1bd9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab1bd9d

Branch: refs/heads/master
Commit: fab1bd9dc9ab43196a3d136f63b77d4b1d58b452
Parents: 081a7dd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 11 23:34:29 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 12 12:19:04 2015 +0100

----------------------------------------------------------------------
 .../memorymanager/DefaultMemoryManager.java     | 107 ++++++++++---------
 1 file changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fab1bd9d/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index c02c417..5f84b23 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.memorymanager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
 
 public class DefaultMemoryManager implements MemoryManager {
        
@@ -266,8 +266,6 @@ public class DefaultMemoryManager implements MemoryManager {
 
        @Override
        public <T extends MemorySegment> void release(Collection<T> segments) {
-               
-               // sanity checks
                if (segments == null) {
                        return;
                }
@@ -279,49 +277,62 @@ public class DefaultMemoryManager implements 
MemoryManager {
                                throw new IllegalStateException("Memory manager 
has been shut down.");
                        }
 
-                       final Iterator<T> segmentsIterator = 
segments.iterator();
-                       
-                       AbstractInvokable lastOwner = null;
-                       Set<DefaultMemorySegment> segsForOwner = null;
+                       // since concurrent modifications to the collection
+                       // can disturb the release, we need to try potentially
+                       // multiple times
+                       boolean successfullyReleased = false;
+                       do {
+                               final Iterator<T> segmentsIterator = 
segments.iterator();
+
+                               AbstractInvokable lastOwner = null;
+                               Set<DefaultMemorySegment> segsForOwner = null;
 
-                       // go over all segments
-                       while (segmentsIterator.hasNext()) {
-                               
-                               final MemorySegment seg = 
segmentsIterator.next();
-                               if (seg.isFreed()) {
-                                       continue;
-                               }
-                               
-                               final DefaultMemorySegment defSeg = 
(DefaultMemorySegment) seg;
-                               final AbstractInvokable owner = defSeg.owner;
-                               
                                try {
-                                       // get the list of segments by this 
owner only if it is a different owner than for
-                                       // the previous one (or it is the first 
segment)
-                                       if (lastOwner != owner) {
-                                               lastOwner = owner;
-                                               segsForOwner = 
this.allocatedSegments.get(owner);
-                                       }
-                                       
-                                       // remove the segment from the list
-                                       if (segsForOwner != null) {
-                                               segsForOwner.remove(defSeg);
-                                               if (segsForOwner.isEmpty()) {
-                                                       
this.allocatedSegments.remove(owner);
+                                       // go over all segments
+                                       while (segmentsIterator.hasNext()) {
+
+                                               final MemorySegment seg = 
segmentsIterator.next();
+                                               if (seg == null || 
seg.isFreed()) {
+                                                       continue;
+                                               }
+
+                                               final DefaultMemorySegment 
defSeg = (DefaultMemorySegment) seg;
+                                               final AbstractInvokable owner = 
defSeg.owner;
+
+                                               try {
+                                                       // get the list of 
segments by this owner only if it is a different owner than for
+                                                       // the previous one (or 
it is the first segment)
+                                                       if (lastOwner != owner) 
{
+                                                               lastOwner = 
owner;
+                                                               segsForOwner = 
this.allocatedSegments.get(owner);
+                                                       }
+
+                                                       // remove the segment 
from the list
+                                                       if (segsForOwner != 
null) {
+                                                               
segsForOwner.remove(defSeg);
+                                                               if 
(segsForOwner.isEmpty()) {
+                                                                       
this.allocatedSegments.remove(owner);
+                                                               }
+                                                       }
+                                               } catch (Throwable t) {
+                                                       LOG.error("Error 
removing book-keeping reference to allocated memory segment.", t);
+                                               } finally {
+                                                       // release the memory 
in any case
+                                                       byte[] buffer = 
defSeg.destroy();
+                                                       
this.freeSegments.add(buffer);
                                                }
                                        }
+
+                                       segments.clear();
+
+                                       // the only way to exit the loop
+                                       successfullyReleased = true;
                                }
-                               catch (Throwable t) {
-                                       LOG.error("Error removing book-keeping 
reference to allocated memory segment.", t);
-                               }
-                               finally {
-                                       // release the memory in any case
-                                       byte[] buffer = defSeg.destroy();
-                                       this.freeSegments.add(buffer);
+                               catch (ConcurrentModificationException e) {
+                                       // this may happen in the case where an 
asynchronous
+                                       // call releases the memory. fall 
through the loop and try again
                                }
-                       }
-                       
-                       segments.clear();
+                       } while (!successfullyReleased);
                }
                // -------------------- END CRITICAL SECTION -------------------
        }
@@ -383,7 +394,7 @@ public class DefaultMemoryManager implements MemoryManager {
        
        // 
------------------------------------------------------------------------
        
-       private final int getNumPages(long numBytes) {
+       private int getNumPages(long numBytes) {
                if (numBytes < 0) {
                        throw new IllegalArgumentException("The number of bytes 
to allocate must not be negative.");
                }
@@ -392,11 +403,11 @@ public class DefaultMemoryManager implements 
MemoryManager {
                if (numPages <= Integer.MAX_VALUE) {
                        return (int) numPages;
                } else {
-                       throw new IllegalArgumentException("The given number of 
bytes correstponds to more than MAX_INT pages.");
+                       throw new IllegalArgumentException("The given number of 
bytes corresponds to more than MAX_INT pages.");
                }
        }
 
-       private final int getRelativeNumPages(double fraction){
+       private int getRelativeNumPages(double fraction){
                if (fraction <= 0 || fraction > 1) {
                        throw new IllegalArgumentException("The fraction of 
memory to allocate must within (0, 1].");
                }

Reply via email to