[SYSTEMML-2188] Fix unnecessary evictions to local fs on rdd collect This patch improves the performance of large rdd collect operations. For robustness regarding potential OOMs, we already had functionality for guarded collects that write the RDD to hdfs and read it into memory instead of collect because the latter requires twice the memory of a simple read. However, there are scenarios, where we collect an RDD and because its size exceeds the buffer pool, we immediately evict it to local file system in a single-threaded manner.
This patch explicitly considers the buffer pool limit when deciding for in-memory collect. Furthermore, this also includes some minor refactorings as a precondition for additional improvements regarding unnecessary evictions. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/e1efd844 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/e1efd844 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/e1efd844 Branch: refs/heads/master Commit: e1efd844df27f777a24a7c37cdace4d62e6ae019 Parents: 9db5c0a Author: Matthias Boehm <[email protected]> Authored: Fri Mar 16 01:22:33 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Mar 16 01:28:16 2018 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/hops/OptimizerUtils.java | 30 +++++++++----------- .../opt/PlanSelectionFuseCostBasedV2.java | 2 +- .../hops/cost/CostEstimatorStaticRuntime.java | 4 +-- .../controlprogram/caching/LazyWriteBuffer.java | 23 ++++++++++++--- .../controlprogram/caching/MatrixObject.java | 2 +- 5 files changed, 37 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index f21fe46..274abad 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -43,6 +43,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ForProgramBlock; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; +import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -449,15 +450,12 @@ public class OptimizerUtils return ( size < memBudgetExec && 2*size < memBudgetLocal ); } - public static boolean checkSparkBroadcastMemoryBudget( long rlen, long clen, long brlen, long bclen, long nnz ) - { + public static boolean checkSparkBroadcastMemoryBudget( long rlen, long clen, long brlen, long bclen, long nnz ) { double memBudgetExec = SparkExecutionContext.getBroadcastMemoryBudget(); double memBudgetLocal = OptimizerUtils.getLocalMemBudget(); - double sp = getSparsity(rlen, clen, nnz); double size = estimateSizeExactSparsity(rlen, clen, sp); double sizeP = estimatePartitionedSizeExactSparsity(rlen, clen, brlen, bclen, sp); - //basic requirement: the broadcast needs to to fit once in the remote broadcast memory //and twice into the local memory budget because we have to create a partitioned broadcast //memory and hand it over to the spark context as in-memory object @@ -465,25 +463,25 @@ public class OptimizerUtils && sizeP < memBudgetExec && size+sizeP < memBudgetLocal ); } - public static boolean checkSparkCollectMemoryBudget( MatrixCharacteristics mc, long memPinned ) - { - return checkSparkCollectMemoryBudget( - mc.getRows(), - mc.getCols(), - mc.getRowsPerBlock(), - mc.getColsPerBlock(), - mc.getNonZerosBound(), memPinned); + public static boolean checkSparkCollectMemoryBudget( MatrixCharacteristics mc, long memPinned ) { + return checkSparkCollectMemoryBudget(mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), + mc.getColsPerBlock(), mc.getNonZerosBound(), memPinned, false); } - public static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int brlen, int bclen, long nnz, long memPinned ) - { + public static boolean checkSparkCollectMemoryBudget( MatrixCharacteristics mc, long memPinned, boolean checkBP ) { + return checkSparkCollectMemoryBudget(mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), + mc.getColsPerBlock(), mc.getNonZerosBound(), memPinned, checkBP); + } + + private static boolean checkSparkCollectMemoryBudget( long rlen, long clen, int brlen, int bclen, long nnz, long memPinned, boolean checkBP ) { //compute size of output matrix and its blocked representation double sp = getSparsity(rlen, clen, nnz); double memMatrix = estimateSizeExactSparsity(rlen, clen, sp); double memPMatrix = estimatePartitionedSizeExactSparsity(rlen, clen, brlen, bclen, sp); - //check if both output matrix and partitioned matrix fit into local mem budget - return (memPinned + memMatrix + memPMatrix < getLocalMemBudget()); + return (memPinned + memMatrix + memPMatrix < getLocalMemBudget()) + //check if the output matrix fits into the write buffer to avoid unnecessary evictions + && (!checkBP || memMatrix < LazyWriteBuffer.getWriteBufferLimit()); } public static boolean checkSparseBlockCSRConversion( MatrixCharacteristics mcIn ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java b/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java index e334d8d..76735ea 100644 --- a/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java +++ b/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java @@ -984,7 +984,7 @@ public class PlanSelectionFuseCostBasedV2 extends PlanSelection tmpCosts *= driver.dimsKnown(true) ? driver.getSparsity() : SPARSE_SAFE_SPARSITY_EST; //write correction for known evictions in CP else if( memInputs <= OptimizerUtils.getLocalMemBudget() - && sumTmpInputOutputSize(memo, costVect)*8 > LazyWriteBuffer.getWriteBufferSize() ) + && sumTmpInputOutputSize(memo, costVect)*8 > LazyWriteBuffer.getWriteBufferLimit() ) tmpCosts += costVect.outSize * 8 / WRITE_BANDWIDTH_IO; costs += tmpCosts; if( LOG.isTraceEnabled() ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java index e6ff4be..1d4074d 100644 --- a/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java +++ b/src/main/java/org/apache/sysml/hops/cost/CostEstimatorStaticRuntime.java @@ -110,7 +110,7 @@ public class CostEstimatorStaticRuntime extends CostEstimator ltime += getHDFSReadTime( vs[0]._rlen, vs[0]._clen, vs[0].getSparsity() ); //eviction costs if( CacheableData.CACHING_WRITE_CACHE_ON_READ && - LazyWriteBuffer.getWriteBufferSize()<MatrixBlock.estimateSizeOnDisk(vs[0]._rlen, vs[0]._clen, (long)((vs[0]._nnz<0)? vs[0]._rlen*vs[0]._clen:vs[0]._nnz)) ) + LazyWriteBuffer.getWriteBufferLimit()<MatrixBlock.estimateSizeOnDisk(vs[0]._rlen, vs[0]._clen, (long)((vs[0]._nnz<0)? vs[0]._rlen*vs[0]._clen:vs[0]._nnz)) ) { ltime += Math.abs( getFSWriteTime( vs[0]._rlen, vs[0]._clen, vs[0].getSparsity() )); } @@ -120,7 +120,7 @@ public class CostEstimatorStaticRuntime extends CostEstimator ltime += getHDFSReadTime( vs[1]._rlen, vs[1]._clen, vs[1].getSparsity() ); //eviction costs if( CacheableData.CACHING_WRITE_CACHE_ON_READ && - LazyWriteBuffer.getWriteBufferSize()<MatrixBlock.estimateSizeOnDisk(vs[1]._rlen, vs[1]._clen, (long)((vs[1]._nnz<0)? vs[1]._rlen*vs[1]._clen:vs[1]._nnz)) ) + LazyWriteBuffer.getWriteBufferLimit()<MatrixBlock.estimateSizeOnDisk(vs[1]._rlen, vs[1]._clen, (long)((vs[1]._nnz<0)? vs[1]._rlen*vs[1]._clen:vs[1]._nnz)) ) { ltime += Math.abs( getFSWriteTime( vs[1]._rlen, vs[1]._clen, vs[1].getSparsity()) ); } http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java index d5214a0..b728b38 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/LazyWriteBuffer.java @@ -60,8 +60,7 @@ public class LazyWriteBuffer throws IOException { //obtain basic meta data of cache block - long lSize = cb.isShallowSerialize() ? - cb.getInMemorySize() : cb.getExactSerializedSize(); + long lSize = getCacheBlockSize(cb); boolean requiresWrite = (lSize > _limit //global buffer limit || !ByteBuffer.isValidCapacity(lSize, cb)); //local buffer limit int numEvicted = 0; @@ -196,9 +195,25 @@ public class LazyWriteBuffer PageCache.clear(); } + public static long getWriteBufferLimit() { + //return constant limit because InfrastructureAnalyzer.getLocalMaxMemory() is + //dynamically adjusted in a parfor context, which wouldn't reflect the actual size + return _limit; + } + public static long getWriteBufferSize() { - long maxMem = InfrastructureAnalyzer.getLocalMaxMemory(); - return (long)(CacheableData.CACHING_BUFFER_SIZE * maxMem); + synchronized( _mQueue ) { + return _size; } + } + + public static long getWriteBufferFree() { + synchronized( _mQueue ) { + return _limit - _size; } + } + + public static long getCacheBlockSize(CacheBlock cb) { + return cb.isShallowSerialize() ? + cb.getInMemorySize() : cb.getExactSerializedSize(); } /** http://git-wip-us.apache.org/repos/asf/systemml/blob/e1efd844/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index d0d7347..8bf8efd 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -476,7 +476,7 @@ public class MatrixObject extends CacheableData<MatrixBlock> //guarded rdd collect if( ii == InputInfo.BinaryBlockInputInfo && //guarded collect not for binary cell - !OptimizerUtils.checkSparkCollectMemoryBudget(mc, getPinnedSize()+getBroadcastSize()) ) { + !OptimizerUtils.checkSparkCollectMemoryBudget(mc, getPinnedSize()+getBroadcastSize(), true) ) { //write RDD to hdfs and read to prevent invalid collect mem consumption //note: lazy, partition-at-a-time collect (toLocalIterator) was significantly slower if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file
