Repository: systemml
Updated Branches:
  refs/heads/master 836086e14 -> 2ec6d4d7c


[SYSTEMML-2234] Fix buffer pool memory/file leak (missing cleanup)

This patch fixes severe issues of missing variable cleanup for special
cases of function calls and update-in-place variables. The missing
cleanup leads to unnecessary memory pressure in the buffer pool and thus
unnecessary evictions as well as missing cleanup of local and hdfs files
which can lead to "no space left on device" issues on long running
scripts with many variables (e.g., DL with many epochs).

Furthermore, this patch also includes a hardening of the bufferpool
threshold management and thread-safe buffer pool logging.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2ec6d4d7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2ec6d4d7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2ec6d4d7

Branch: refs/heads/master
Commit: 2ec6d4d7cb2a61e77b31aa6a7f408b1b249c0354
Parents: 836086e
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 5 15:06:19 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Apr 5 16:30:53 2018 -0700

----------------------------------------------------------------------
 .../runtime/controlprogram/ProgramBlock.java    |  3 ++
 .../controlprogram/caching/LazyWriteBuffer.java | 29 ++++++-----
 .../controlprogram/caching/MatrixObject.java    |  2 +-
 .../context/SparkExecutionContext.java          | 54 ++++++++++----------
 .../cp/FunctionCallCPInstruction.java           | 10 ++--
 5 files changed, 50 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
index 308c6a5..b9a5133 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ProgramBlock.java
@@ -310,6 +310,9 @@ public class ProgramBlock implements ParseInfo
                                mo.release();
                                moNew.release();
                                moNew.setUpdateType(UpdateType.INPLACE);
+                               //cleanup old variable (e.g., remove from 
buffer pool)
+                               if( ec.removeVariable(varname) != null )
+                                       ec.cleanupCacheableData(mo);
                                ec.setVariable(varname, moNew);
                        }
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
index b728b38..ad97c4b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -226,21 +226,22 @@ public class LazyWriteBuffer
        {
                System.out.println("WRITE BUFFER STATUS ("+position+") --");
                
-               //print buffer meta data
-               System.out.println("\tWB: Buffer Meta Data: " +
-                                    "limit="+_limit+", " +
-                                    "size[bytes]="+_size+", " +
-                                    
"size[elements]="+_mQueue.size()+"/"+_mQueue.size());
-               
-               //print current buffer entries
-               int count = _mQueue.size();
-               for( Entry<String, ByteBuffer> entry : _mQueue.entrySet() )
-               {
-                       String fname = entry.getKey();
-                       ByteBuffer bbuff = entry.getValue();
+               synchronized( _mQueue ) {
+                       //print buffer meta data
+                       System.out.println("\tWB: Buffer Meta Data: " +
+                               "limit="+_limit+", " +
+                               "size[bytes]="+_size+", " +
+                               
"size[elements]="+_mQueue.size()+"/"+_mQueue.size());
                        
-                       System.out.println("\tWB: buffer element ("+count+"): 
"+fname+", "+bbuff.getSize()+", "+bbuff.isShallow());
-                       count--;
+                       //print current buffer entries
+                       int count = _mQueue.size();
+                       for( Entry<String, ByteBuffer> entry : 
_mQueue.entrySet() ) {
+                               String fname = entry.getKey();
+                               ByteBuffer bbuff = entry.getValue();
+                               System.out.println("\tWB: buffer element 
("+count+"): "
+                                       +fname+", "+bbuff.getSize()+", 
"+bbuff.isShallow());
+                               count--;
+                       }
                }
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 9714a19..678ddc0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -406,7 +406,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                
        @Override
        protected boolean isBelowCachingThreshold() {
-               return super.isBelowCachingThreshold()
+               return LazyWriteBuffer.getCacheBlockSize(_data) <= 
CACHING_THRESHOLD
                        || getUpdateType() == UpdateType.INPLACE_PINNED;
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index a7211ce..98c3eaa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1055,46 +1055,44 @@ public class SparkExecutionContext extends 
ExecutionContext
        }
 
        @Override
-       public void cleanupCacheableData( CacheableData<?> mo )
+       public void cleanupCacheableData(CacheableData<?> mo)
        {
                //NOTE: this method overwrites the default behavior of 
cleanupMatrixObject
                //and hence is transparently used by rmvar instructions and 
other users. The
                //core difference is the lineage-based cleanup of RDD and 
broadcast variables.
 
+               if( !mo.isCleanupEnabled() )
+                       return;
+               
                try
                {
-                       if ( mo.isCleanupEnabled() )
-                       {
-                               //compute ref count only if matrix cleanup 
actually necessary
-                               if ( !getVariables().hasReferences(mo) )
-                               {
-                                       //clean cached data
-                                       mo.clearData();
-
-                                       //clean hdfs data if no pending rdd 
operations on it
-                                       if( mo.isHDFSFileExists() && 
mo.getFileName()!=null ) {
-                                               if( mo.getRDDHandle()==null ) {
-                                                       
MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName());
-                                               }
-                                               else { //deferred file removal
-                                                       RDDObject rdd = 
mo.getRDDHandle();
-                                                       
rdd.setHDFSFilename(mo.getFileName());
-                                               }
-                                       }
-
-                                       //cleanup RDD and broadcast variables 
(recursive)
-                                       //note: requires that mo.clearData 
already removed back references
-                                       if( mo.getRDDHandle()!=null ) {
-                                               
rCleanupLineageObject(mo.getRDDHandle());
+                       //compute ref count only if matrix cleanup actually 
necessary
+                       if( !getVariables().hasReferences(mo) ) {
+                               //clean cached data
+                               mo.clearData();
+
+                               //clean hdfs data if no pending rdd operations 
on it
+                               if( mo.isHDFSFileExists() && 
mo.getFileName()!=null ) {
+                                       if( mo.getRDDHandle()==null ) {
+                                               
MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName());
                                        }
-                                       if( mo.getBroadcastHandle()!=null ) {
-                                               
rCleanupLineageObject(mo.getBroadcastHandle());
+                                       else { //deferred file removal
+                                               RDDObject rdd = 
mo.getRDDHandle();
+                                               
rdd.setHDFSFilename(mo.getFileName());
                                        }
                                }
+
+                               //cleanup RDD and broadcast variables 
(recursive)
+                               //note: requires that mo.clearData already 
removed back references
+                               if( mo.getRDDHandle()!=null ) {
+                                       
rCleanupLineageObject(mo.getRDDHandle());
+                               }
+                               if( mo.getBroadcastHandle()!=null ) {
+                                       
rCleanupLineageObject(mo.getBroadcastHandle());
+                               }
                        }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2ec6d4d7/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
index 588d2ff..dea48bc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/FunctionCallCPInstruction.java
@@ -21,7 +21,6 @@ package org.apache.sysml.runtime.instructions.cp;
 
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Map.Entry;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.lops.Lop;
@@ -166,12 +165,13 @@ public class FunctionCallCPInstruction extends 
CPInstruction {
                        expectRetVars.add(di.getName());
                
                LocalVariableMap retVars = fn_ec.getVariables();
-               for( Entry<String,Data> var : retVars.entrySet() ) {
-                       if( expectRetVars.contains(var.getKey()) )
+               for( String varName : new ArrayList<>(retVars.keySet()) ) {
+                       if( expectRetVars.contains(varName) )
                                continue;
                        //cleanup unexpected return values to avoid leaks
-                       if( var.getValue() instanceof CacheableData )
-                               
fn_ec.cleanupCacheableData((CacheableData<?>)var.getValue());
+                       Data var = fn_ec.removeVariable(varName);
+                       if( var instanceof CacheableData )
+                               
fn_ec.cleanupCacheableData((CacheableData<?>)var);
                }
                
                // Unpin the pinned variables

Reply via email to