Repository: flink Updated Branches: refs/heads/master 081a7ddda -> fab1bd9dc
[FLINK-1432] [runtime] Make memory segment release robust against concurrent modifications of the collection Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab1bd9d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab1bd9d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab1bd9d Branch: refs/heads/master Commit: fab1bd9dc9ab43196a3d136f63b77d4b1d58b452 Parents: 081a7dd Author: Stephan Ewen <se...@apache.org> Authored: Wed Feb 11 23:34:29 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 12 12:19:04 2015 +0100 ---------------------------------------------------------------------- .../memorymanager/DefaultMemoryManager.java | 107 ++++++++++--------- 1 file changed, 59 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fab1bd9d/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java index c02c417..5f84b23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java @@ -16,23 +16,23 @@ * limitations under the License. */ - package org.apache.flink.runtime.memorymanager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - public class DefaultMemoryManager implements MemoryManager { @@ -266,8 +266,6 @@ public class DefaultMemoryManager implements MemoryManager { @Override public <T extends MemorySegment> void release(Collection<T> segments) { - - // sanity checks if (segments == null) { return; } @@ -279,49 +277,62 @@ public class DefaultMemoryManager implements MemoryManager { throw new IllegalStateException("Memory manager has been shut down."); } - final Iterator<T> segmentsIterator = segments.iterator(); - - AbstractInvokable lastOwner = null; - Set<DefaultMemorySegment> segsForOwner = null; + // since concurrent modifications to the collection + // can disturb the release, we need to try potentially + // multiple times + boolean successfullyReleased = false; + do { + final Iterator<T> segmentsIterator = segments.iterator(); + + AbstractInvokable lastOwner = null; + Set<DefaultMemorySegment> segsForOwner = null; - // go over all segments - while (segmentsIterator.hasNext()) { - - final MemorySegment seg = segmentsIterator.next(); - if (seg.isFreed()) { - continue; - } - - final DefaultMemorySegment defSeg = (DefaultMemorySegment) seg; - final AbstractInvokable owner = defSeg.owner; - try { - // get the list of segments by this owner only if it is a different owner than for - // the previous one (or it is the first segment) - if (lastOwner != owner) { - lastOwner = owner; - segsForOwner = this.allocatedSegments.get(owner); - } - - // remove the segment from the list - if (segsForOwner != null) { - segsForOwner.remove(defSeg); - if (segsForOwner.isEmpty()) { - this.allocatedSegments.remove(owner); + // go over all segments + while (segmentsIterator.hasNext()) { + + final MemorySegment seg = segmentsIterator.next(); + if (seg == null || seg.isFreed()) { + continue; + } + + final DefaultMemorySegment defSeg = (DefaultMemorySegment) seg; + final AbstractInvokable owner = defSeg.owner; + + try { + // get the list of segments by this owner only if it is a different owner than for + // the previous one (or it is the first segment) + if (lastOwner != owner) { + lastOwner = owner; + segsForOwner = this.allocatedSegments.get(owner); + } + + // remove the segment from the list + if (segsForOwner != null) { + segsForOwner.remove(defSeg); + if (segsForOwner.isEmpty()) { + this.allocatedSegments.remove(owner); + } + } + } catch (Throwable t) { + LOG.error("Error removing book-keeping reference to allocated memory segment.", t); + } finally { + // release the memory in any case + byte[] buffer = defSeg.destroy(); + this.freeSegments.add(buffer); } } + + segments.clear(); + + // the only way to exit the loop + successfullyReleased = true; } - catch (Throwable t) { - LOG.error("Error removing book-keeping reference to allocated memory segment.", t); - } - finally { - // release the memory in any case - byte[] buffer = defSeg.destroy(); - this.freeSegments.add(buffer); + catch (ConcurrentModificationException e) { + // this may happen in the case where an asynchronous + // call releases the memory. fall through the loop and try again } - } - - segments.clear(); + } while (!successfullyReleased); } // -------------------- END CRITICAL SECTION ------------------- } @@ -383,7 +394,7 @@ public class DefaultMemoryManager implements MemoryManager { // ------------------------------------------------------------------------ - private final int getNumPages(long numBytes) { + private int getNumPages(long numBytes) { if (numBytes < 0) { throw new IllegalArgumentException("The number of bytes to allocate must not be negative."); } @@ -392,11 +403,11 @@ public class DefaultMemoryManager implements MemoryManager { if (numPages <= Integer.MAX_VALUE) { return (int) numPages; } else { - throw new IllegalArgumentException("The given number of bytes correstponds to more than MAX_INT pages."); + throw new IllegalArgumentException("The given number of bytes corresponds to more than MAX_INT pages."); } } - private final int getRelativeNumPages(double fraction){ + private int getRelativeNumPages(double fraction){ if (fraction <= 0 || fraction > 1) { throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1]."); }