Repository: systemml Updated Branches: refs/heads/master 836086e14 -> 2ec6d4d7c
[SYSTEMML-2234] Fix buffer pool memory/file leak (missing cleanup) This patch fixes severe issues of missing variable cleanup for special cases of function calls and update-in-place variables. The missing cleanup leads to unnecessary memory pressure in the buffer pool and thus unnecessary evictions as well as missing cleanup of local and hdfs files which can lead to "no space left on device" issues on long running scripts with many variables (e.g., DL with many epochs). Furthermore, this patch also includes a hardening of the bufferpool threshold management and thread-safe buffer pool logging. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2ec6d4d7 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2ec6d4d7 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2ec6d4d7 Branch: refs/heads/master Commit: 2ec6d4d7cb2a61e77b31aa6a7f408b1b249c0354 Parents: 836086e Author: Matthias Boehm <[email protected]> Authored: Thu Apr 5 15:06:19 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Apr 5 16:30:53 2018 -0700 ---------------------------------------------------------------------- .../runtime/controlprogram/ProgramBlock.java | 3 ++ .../controlprogram/caching/LazyWriteBuffer.java | 29 ++++++----- .../controlprogram/caching/MatrixObject.java | 2 +- .../context/SparkExecutionContext.java | 54 ++++++++++---------- .../cp/FunctionCallCPInstruction.java | 10 ++-- 5 files changed, 50 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java index 308c6a5..b9a5133 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java @@ -310,6 +310,9 @@ public class ProgramBlock implements ParseInfo mo.release(); moNew.release(); moNew.setUpdateType(UpdateType.INPLACE); + //cleanup old variable (e.g., remove from buffer pool) + if( ec.removeVariable(varname) != null ) + ec.cleanupCacheableData(mo); ec.setVariable(varname, moNew); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java index b728b38..ad97c4b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -226,21 +226,22 @@ public class LazyWriteBuffer { System.out.println("WRITE BUFFER STATUS ("+position+") --"); - //print buffer meta data - System.out.println("\tWB: Buffer Meta Data: " + - "limit="+_limit+", " + - "size[bytes]="+_size+", " + - "size[elements]="+_mQueue.size()+"/"+_mQueue.size()); - - //print current buffer entries - int count = _mQueue.size(); - for( Entry<String, ByteBuffer> entry : _mQueue.entrySet() ) - { - String fname = entry.getKey(); - ByteBuffer bbuff = entry.getValue(); + synchronized( _mQueue ) { + //print buffer meta data + System.out.println("\tWB: Buffer Meta Data: " + + "limit="+_limit+", " + + "size[bytes]="+_size+", " + + "size[elements]="+_mQueue.size()+"/"+_mQueue.size()); - System.out.println("\tWB: buffer element ("+count+"): "+fname+", "+bbuff.getSize()+", "+bbuff.isShallow()); - count--; + //print current buffer entries + int count = _mQueue.size(); + for( Entry<String, ByteBuffer> entry : _mQueue.entrySet() ) { + String fname = entry.getKey(); + ByteBuffer bbuff = entry.getValue(); + System.out.println("\tWB: buffer element ("+count+"): " + +fname+", "+bbuff.getSize()+", "+bbuff.isShallow()); + count--; + } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index 9714a19..678ddc0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -406,7 +406,7 @@ public class MatrixObject extends CacheableData<MatrixBlock> @Override protected boolean isBelowCachingThreshold() { - return super.isBelowCachingThreshold() + return LazyWriteBuffer.getCacheBlockSize(_data) <= CACHING_THRESHOLD || getUpdateType() == UpdateType.INPLACE_PINNED; } http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index a7211ce..98c3eaa 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -1055,46 +1055,44 @@ public class SparkExecutionContext extends ExecutionContext } @Override - public void cleanupCacheableData( CacheableData<?> mo ) + public void cleanupCacheableData(CacheableData<?> mo) { //NOTE: this method overwrites the default behavior of cleanupMatrixObject //and hence is transparently used by rmvar instructions and other users. The //core difference is the lineage-based cleanup of RDD and broadcast variables. + if( !mo.isCleanupEnabled() ) + return; + try { - if ( mo.isCleanupEnabled() ) - { - //compute ref count only if matrix cleanup actually necessary - if ( !getVariables().hasReferences(mo) ) - { - //clean cached data - mo.clearData(); - - //clean hdfs data if no pending rdd operations on it - if( mo.isHDFSFileExists() && mo.getFileName()!=null ) { - if( mo.getRDDHandle()==null ) { - MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName()); - } - else { //deferred file removal - RDDObject rdd = mo.getRDDHandle(); - rdd.setHDFSFilename(mo.getFileName()); - } - } - - //cleanup RDD and broadcast variables (recursive) - //note: requires that mo.clearData already removed back references - if( mo.getRDDHandle()!=null ) { - rCleanupLineageObject(mo.getRDDHandle()); + //compute ref count only if matrix cleanup actually necessary + if( !getVariables().hasReferences(mo) ) { + //clean cached data + mo.clearData(); + + //clean hdfs data if no pending rdd operations on it + if( mo.isHDFSFileExists() && mo.getFileName()!=null ) { + if( mo.getRDDHandle()==null ) { + MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName()); } - if( mo.getBroadcastHandle()!=null ) { - rCleanupLineageObject(mo.getBroadcastHandle()); + else { //deferred file removal + RDDObject rdd = mo.getRDDHandle(); + rdd.setHDFSFilename(mo.getFileName()); } } + + //cleanup RDD and broadcast variables (recursive) + //note: requires that mo.clearData already removed back references + if( mo.getRDDHandle()!=null ) { + rCleanupLineageObject(mo.getRDDHandle()); + } + if( mo.getBroadcastHandle()!=null ) { + rCleanupLineageObject(mo.getBroadcastHandle()); + } } } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java index 588d2ff..dea48bc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java @@ -21,7 +21,6 @@ package org.apache.sysml.runtime.instructions.cp; import java.util.ArrayList; import java.util.HashSet; -import java.util.Map.Entry; import org.apache.sysml.api.DMLScript; import org.apache.sysml.lops.Lop; @@ -166,12 +165,13 @@ public class FunctionCallCPInstruction extends CPInstruction { expectRetVars.add(di.getName()); LocalVariableMap retVars = fn_ec.getVariables(); - for( Entry<String,Data> var : retVars.entrySet() ) { - if( expectRetVars.contains(var.getKey()) ) + for( String varName : new ArrayList<>(retVars.keySet()) ) { + if( expectRetVars.contains(varName) ) continue; //cleanup unexpected return values to avoid leaks - if( var.getValue() instanceof CacheableData ) - fn_ec.cleanupCacheableData((CacheableData<?>)var.getValue()); + Data var = fn_ec.removeVariable(varName); + if( var instanceof CacheableData ) + fn_ec.cleanupCacheableData((CacheableData<?>)var); } // Unpin the pinned variables
