Repository: systemml Updated Branches: refs/heads/master 50a895f86 -> 32620f294
[SYSTEMML-1922] Fix memory-efficiency spark broadcasts, OOM AutoEncoder This patch fixes OOM issues on our staging/autoencoder-2layer.dml script, which revealed a general issue of unaccounted broadcast memory requirements. In detail, Spark keeps a deep copy of the blockified broadcast as well as a reference to the passed object. So far we did not account for the latter, which keeps this object in-memory even though its evicted and put onto a soft reference. We now explicitly clear blocks (except in local mode) after the broadcast has been created (and thus blockified) because remote fetches only reference the blockified chunks. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/a3407ae7 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/a3407ae7 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/a3407ae7 Branch: refs/heads/master Commit: a3407ae7a9d8156225db28d057a9e5db2cbc2c90 Parents: 50a895f Author: Matthias Boehm <[email protected]> Authored: Wed Sep 20 15:50:41 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Sep 21 11:44:27 2017 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 34 +++++++++++++------- .../spark/data/PartitionedBlock.java | 4 +++ 2 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/a3407ae7/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index e0352b0..966049c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -264,6 +264,10 @@ public class SparkExecutionContext extends ExecutionContext return conf; } + + public static boolean isLocalMaster() { + return getSparkContextStatic().isLocal(); + } /** * Spark instructions should call this for all matrix inputs except broadcast @@ -491,21 +495,19 @@ public class SparkExecutionContext extends ExecutionContext return rdd; } - - /** - * TODO So far we only create broadcast variables but never destroy - * them. This is a memory leak which might lead to executor out-of-memory. - * However, in order to handle this, we need to keep track when broadcast - * variables are no longer required. - * - * @param varname variable name - * @return wrapper for broadcast variables - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ + @SuppressWarnings("unchecked") public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname ) throws DMLRuntimeException { + //NOTE: The memory consumption of this method is the in-memory size of the + //matrix object plus the partitioned size in 1k-1k blocks. Since the call + //to broadcast happens after the matrix object has been released, the memory + //requirements of blockified chunks in Spark's block manager are covered under + //this maximum. Also note that we explicitly clear the in-memory blocks once + //the broadcasts are created (other than in local mode) in order to avoid + //unnecessary memory requirements during the lifetime of this broadcast handle. + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; MatrixObject mo = getMatrixObject(varname); @@ -547,12 +549,16 @@ public class SparkExecutionContext extends ExecutionContext int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset); PartitionedBlock<MatrixBlock> tmp = pmb.createPartition(offset, numBlks, new MatrixBlock()); ret[i] = getSparkContext().broadcast(tmp); + if( !isLocalMaster() ) + tmp.clearBlocks(); } } else { //single partition ret[0] = getSparkContext().broadcast(pmb); + if( !isLocalMaster() ) + pmb.clearBlocks(); } - + bret = new PartitionedBroadcast<MatrixBlock>(ret); BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<MatrixBlock>(bret, varname, OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics())); @@ -613,10 +619,14 @@ public class SparkExecutionContext extends ExecutionContext int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset); PartitionedBlock<FrameBlock> tmp = pmb.createPartition(offset, numBlks, new FrameBlock()); ret[i] = getSparkContext().broadcast(tmp); + if( !isLocalMaster() ) + tmp.clearBlocks(); } } else { //single partition ret[0] = getSparkContext().broadcast(pmb); + if( !isLocalMaster() ) + pmb.clearBlocks(); } bret = new PartitionedBroadcast<FrameBlock>(ret); http://git-wip-us.apache.org/repos/asf/systemml/blob/a3407ae7/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java index ca9cde4..08257f6 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java @@ -254,6 +254,10 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable } } } + + public void clearBlocks() { + _partBlocks = null; + } /** * Redirects the default java serialization via externalizable to our default
