Repository: systemml
Updated Branches:
  refs/heads/master 50a895f86 -> 32620f294


[SYSTEMML-1922] Fix memory-efficiency spark broadcasts, OOM AutoEncoder

This patch fixes OOM issues on our staging/autoencoder-2layer.dml
script, which revealed a general issue of unaccounted broadcast memory
requirements. In detail, Spark keeps a deep copy of the blockified
broadcast as well as a reference to the passed object. So far we did not
account for the latter, which keeps this object in-memory even though
its evicted and put onto a soft reference. 

We now explicitly clear blocks (except in local mode) after the
broadcast has been created (and thus blockified) because remote fetches
only reference the blockified chunks. 


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/a3407ae7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/a3407ae7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/a3407ae7

Branch: refs/heads/master
Commit: a3407ae7a9d8156225db28d057a9e5db2cbc2c90
Parents: 50a895f
Author: Matthias Boehm <[email protected]>
Authored: Wed Sep 20 15:50:41 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Sep 21 11:44:27 2017 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 34 +++++++++++++-------
 .../spark/data/PartitionedBlock.java            |  4 +++
 2 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/a3407ae7/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index e0352b0..966049c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -264,6 +264,10 @@ public class SparkExecutionContext extends ExecutionContext
 
                return conf;
        }
+       
+       public static boolean isLocalMaster() {
+               return getSparkContextStatic().isLocal();
+       }
 
        /**
         * Spark instructions should call this for all matrix inputs except 
broadcast
@@ -491,21 +495,19 @@ public class SparkExecutionContext extends 
ExecutionContext
 
                return rdd;
        }
-
-       /**
-        * TODO So far we only create broadcast variables but never destroy
-        * them. This is a memory leak which might lead to executor 
out-of-memory.
-        * However, in order to handle this, we need to keep track when 
broadcast
-        * variables are no longer required.
-        *
-        * @param varname variable name
-        * @return wrapper for broadcast variables
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
+       
        @SuppressWarnings("unchecked")
        public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( 
String varname )
                throws DMLRuntimeException
        {
+               //NOTE: The memory consumption of this method is the in-memory 
size of the 
+               //matrix object plus the partitioned size in 1k-1k blocks. 
Since the call
+               //to broadcast happens after the matrix object has been 
released, the memory
+               //requirements of blockified chunks in Spark's block manager 
are covered under
+               //this maximum. Also note that we explicitly clear the 
in-memory blocks once
+               //the broadcasts are created (other than in local mode) in 
order to avoid 
+               //unnecessary memory requirements during the lifetime of this 
broadcast handle.
+               
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
                MatrixObject mo = getMatrixObject(varname);
@@ -547,12 +549,16 @@ public class SparkExecutionContext extends 
ExecutionContext
                                        int numBlks = Math.min(numPerPart, 
pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
                                        PartitionedBlock<MatrixBlock> tmp = 
pmb.createPartition(offset, numBlks, new MatrixBlock());
                                        ret[i] = 
getSparkContext().broadcast(tmp);
+                                       if( !isLocalMaster() )
+                                               tmp.clearBlocks();
                                }
                        }
                        else { //single partition
                                ret[0] = getSparkContext().broadcast(pmb);
+                               if( !isLocalMaster() )
+                                       pmb.clearBlocks();
                        }
-
+                       
                        bret = new PartitionedBroadcast<MatrixBlock>(ret);
                        BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<MatrixBlock>(bret, varname,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
@@ -613,10 +619,14 @@ public class SparkExecutionContext extends 
ExecutionContext
                                        int numBlks = Math.min(numPerPart, 
pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
                                        PartitionedBlock<FrameBlock> tmp = 
pmb.createPartition(offset, numBlks, new FrameBlock());
                                        ret[i] = 
getSparkContext().broadcast(tmp);
+                                       if( !isLocalMaster() )
+                                               tmp.clearBlocks();
                                }
                        }
                        else { //single partition
                                ret[0] = getSparkContext().broadcast(pmb);
+                               if( !isLocalMaster() )
+                                       pmb.clearBlocks();
                        }
 
                        bret = new PartitionedBroadcast<FrameBlock>(ret);

http://git-wip-us.apache.org/repos/asf/systemml/blob/a3407ae7/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index ca9cde4..08257f6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -254,6 +254,10 @@ public class PartitionedBlock<T extends CacheBlock> 
implements Externalizable
                        }
                }
        }
+       
+       public void clearBlocks() {
+               _partBlocks = null;
+       }
 
        /**
         * Redirects the default java serialization via externalizable to our 
default 

Reply via email to