[SYSTEMML-2188] Fix unnecessary evictions to local fs on rdd collect

This patch improves the performance of large rdd collect operations. For
robustness regarding potential OOMs, we already had functionality for
guarded collects that write the RDD to hdfs and read it into memory
instead of collect because the latter requires twice the memory of a
simple read. However, there are scenarios, where we collect an RDD and
because its size exceeds the buffer pool, we immediately evict it to
local file system in a single-threaded manner.

This patch explicitly considers the buffer pool limit when deciding for
in-memory collect. Furthermore, this also includes some minor
refactorings as a precondition for additional improvements regarding
unnecessary evictions.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/e1efd844
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/e1efd844
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/e1efd844

Branch: refs/heads/master
Commit: e1efd844df27f777a24a7c37cdace4d62e6ae019
Parents: 9db5c0a
Author: Matthias Boehm <[email protected]>
Authored: Fri Mar 16 01:22:33 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Mar 16 01:28:16 2018 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   | 30 +++++++++-----------
 .../opt/PlanSelectionFuseCostBasedV2.java       |  2 +-
 .../hops/cost/CostEstimatorStaticRuntime.java   |  4 +--
 .../controlprogram/caching/LazyWriteBuffer.java | 23 ++++++++++++---
 .../controlprogram/caching/MatrixObject.java    |  2 +-
 5 files changed, 37 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index f21fe46..274abad 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -43,6 +43,7 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -449,15 +450,12 @@ public class OptimizerUtils
                return ( size < memBudgetExec && 2*size < memBudgetLocal );
        }
 
-       public static boolean checkSparkBroadcastMemoryBudget( long rlen, long 
clen, long brlen, long bclen, long nnz )
-       {
+       public static boolean checkSparkBroadcastMemoryBudget( long rlen, long 
clen, long brlen, long bclen, long nnz ) {
                double memBudgetExec = 
SparkExecutionContext.getBroadcastMemoryBudget();
                double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
-
                double sp = getSparsity(rlen, clen, nnz);
                double size = estimateSizeExactSparsity(rlen, clen, sp);
                double sizeP = estimatePartitionedSizeExactSparsity(rlen, clen, 
brlen, bclen, sp);
-               
                //basic requirement: the broadcast needs to to fit once in the 
remote broadcast memory 
                //and twice into the local memory budget because we have to 
create a partitioned broadcast
                //memory and hand it over to the spark context as in-memory 
object
@@ -465,25 +463,25 @@ public class OptimizerUtils
                                && sizeP < memBudgetExec && size+sizeP < 
memBudgetLocal );
        }
 
-       public static boolean checkSparkCollectMemoryBudget( 
MatrixCharacteristics mc, long memPinned )
-       {
-               return checkSparkCollectMemoryBudget(
-                               mc.getRows(), 
-                               mc.getCols(),
-                               mc.getRowsPerBlock(),
-                               mc.getColsPerBlock(),
-                               mc.getNonZerosBound(), memPinned);
+       public static boolean checkSparkCollectMemoryBudget( 
MatrixCharacteristics mc, long memPinned ) {
+               return checkSparkCollectMemoryBudget(mc.getRows(), 
mc.getCols(), mc.getRowsPerBlock(),
+                       mc.getColsPerBlock(), mc.getNonZerosBound(), memPinned, 
false);
        }
        
-       public static boolean checkSparkCollectMemoryBudget( long rlen, long 
clen, int brlen, int bclen, long nnz, long memPinned )
-       {
+       public static boolean checkSparkCollectMemoryBudget( 
MatrixCharacteristics mc, long memPinned, boolean checkBP ) {
+               return checkSparkCollectMemoryBudget(mc.getRows(), 
mc.getCols(), mc.getRowsPerBlock(),
+                       mc.getColsPerBlock(), mc.getNonZerosBound(), memPinned, 
checkBP);
+       }
+       
+       private static boolean checkSparkCollectMemoryBudget( long rlen, long 
clen, int brlen, int bclen, long nnz, long memPinned, boolean checkBP ) {
                //compute size of output matrix and its blocked representation
                double sp = getSparsity(rlen, clen, nnz);
                double memMatrix = estimateSizeExactSparsity(rlen, clen, sp);
                double memPMatrix = estimatePartitionedSizeExactSparsity(rlen, 
clen, brlen, bclen, sp);
-               
                //check if both output matrix and partitioned matrix fit into 
local mem budget
-               return (memPinned + memMatrix + memPMatrix < 
getLocalMemBudget());
+               return (memPinned + memMatrix + memPMatrix < 
getLocalMemBudget())
+               //check if the output matrix fits into the write buffer to 
avoid unnecessary evictions
+                       && (!checkBP || memMatrix < 
LazyWriteBuffer.getWriteBufferLimit());
        }
 
        public static boolean checkSparseBlockCSRConversion( 
MatrixCharacteristics mcIn ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
 
b/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
index e334d8d..76735ea 100644
--- 
a/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
+++ 
b/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
@@ -984,7 +984,7 @@ public class PlanSelectionFuseCostBasedV2 extends 
PlanSelection
                                tmpCosts *= driver.dimsKnown(true) ? 
driver.getSparsity() : SPARSE_SAFE_SPARSITY_EST;
                        //write correction for known evictions in CP
                        else if( memInputs <= OptimizerUtils.getLocalMemBudget()
-                               && sumTmpInputOutputSize(memo, costVect)*8 > 
LazyWriteBuffer.getWriteBufferSize() )
+                               && sumTmpInputOutputSize(memo, costVect)*8 > 
LazyWriteBuffer.getWriteBufferLimit() )
                                tmpCosts += costVect.outSize * 8 / 
WRITE_BANDWIDTH_IO;
                        costs += tmpCosts;
                        if( LOG.isTraceEnabled() ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java 
b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
index e6ff4be..1d4074d 100644
--- a/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
+++ b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java
@@ -110,7 +110,7 @@ public class CostEstimatorStaticRuntime extends 
CostEstimator
                        ltime += getHDFSReadTime( vs[0]._rlen, vs[0]._clen, 
vs[0].getSparsity() );
                        //eviction costs
                        if( CacheableData.CACHING_WRITE_CACHE_ON_READ &&
-                               
LazyWriteBuffer.getWriteBufferSize()<MatrixBlock.estimateSizeOnDisk(vs[0]._rlen,
 vs[0]._clen, (long)((vs[0]._nnz<0)? vs[0]._rlen*vs[0]._clen:vs[0]._nnz)) )
+                               
LazyWriteBuffer.getWriteBufferLimit()<MatrixBlock.estimateSizeOnDisk(vs[0]._rlen,
 vs[0]._clen, (long)((vs[0]._nnz<0)? vs[0]._rlen*vs[0]._clen:vs[0]._nnz)) )
                        {
                                ltime += Math.abs( getFSWriteTime( vs[0]._rlen, 
vs[0]._clen, vs[0].getSparsity() ));
                        }
@@ -120,7 +120,7 @@ public class CostEstimatorStaticRuntime extends 
CostEstimator
                        ltime += getHDFSReadTime( vs[1]._rlen, vs[1]._clen, 
vs[1].getSparsity() );
                        //eviction costs
                        if( CacheableData.CACHING_WRITE_CACHE_ON_READ &&
-                               
LazyWriteBuffer.getWriteBufferSize()<MatrixBlock.estimateSizeOnDisk(vs[1]._rlen,
 vs[1]._clen, (long)((vs[1]._nnz<0)? vs[1]._rlen*vs[1]._clen:vs[1]._nnz)) )
+                               
LazyWriteBuffer.getWriteBufferLimit()<MatrixBlock.estimateSizeOnDisk(vs[1]._rlen,
 vs[1]._clen, (long)((vs[1]._nnz<0)? vs[1]._rlen*vs[1]._clen:vs[1]._nnz)) )
                        {
                                ltime += Math.abs( getFSWriteTime( vs[1]._rlen, 
vs[1]._clen, vs[1].getSparsity()) );
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
index d5214a0..b728b38 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -60,8 +60,7 @@ public class LazyWriteBuffer
                throws IOException
        {
                //obtain basic meta data of cache block
-               long lSize = cb.isShallowSerialize() ?
-                       cb.getInMemorySize() : cb.getExactSerializedSize();
+               long lSize = getCacheBlockSize(cb);
                boolean requiresWrite = (lSize > _limit        //global buffer 
limit
                        || !ByteBuffer.isValidCapacity(lSize, cb)); //local 
buffer limit
                int numEvicted = 0;
@@ -196,9 +195,25 @@ public class LazyWriteBuffer
                        PageCache.clear();
        }
 
+       public static long getWriteBufferLimit() {
+               //return constant limit because 
InfrastructureAnalyzer.getLocalMaxMemory() is
+               //dynamically adjusted in a parfor context, which wouldn't 
reflect the actual size
+               return _limit;
+       }
+       
        public static long getWriteBufferSize() {
-               long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
-               return (long)(CacheableData.CACHING_BUFFER_SIZE * maxMem);
+               synchronized( _mQueue ) {
+                       return _size; }
+       }
+       
+       public static long getWriteBufferFree() {
+               synchronized( _mQueue ) {
+                       return _limit - _size; }
+       }
+       
+       public static long getCacheBlockSize(CacheBlock cb) {
+               return cb.isShallowSerialize() ?
+                       cb.getInMemorySize() : cb.getExactSerializedSize();
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index d0d7347..8bf8efd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -476,7 +476,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                        
                        //guarded rdd collect 
                        if( ii == InputInfo.BinaryBlockInputInfo && //guarded 
collect not for binary cell
-                               
!OptimizerUtils.checkSparkCollectMemoryBudget(mc, 
getPinnedSize()+getBroadcastSize()) ) {
+                               
!OptimizerUtils.checkSparkCollectMemoryBudget(mc, 
getPinnedSize()+getBroadcastSize(), true) ) {
                                //write RDD to hdfs and read to prevent invalid 
collect mem consumption 
                                //note: lazy, partition-at-a-time collect 
(toLocalIterator) was significantly slower
                                if( 
!MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing 
file

Reply via email to