[SYSTEMML-382] MCSR-CSR sparse block conversion on rdd repartition-cache Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2d32f6d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2d32f6d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2d32f6d5
Branch: refs/heads/master Commit: 2d32f6d5812fba456bbae432c73c5da81a8c3e72 Parents: 1b29283 Author: Matthias Boehm <[email protected]> Authored: Sat Jan 23 19:50:13 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Jan 23 19:50:13 2016 -0800 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/OptimizerUtils.java | 11 +++++++++++ .../controlprogram/context/SparkExecutionContext.java | 11 ++++++++++- .../instructions/spark/CheckpointSPInstruction.java | 5 +---- 3 files changed, 22 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 57d38d7..80ef5f0 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -31,6 +31,7 @@ import org.apache.sysml.hops.Hop.DataOpTypes; import org.apache.sysml.hops.Hop.FileFormatTypes; import org.apache.sysml.hops.Hop.OpOp2; import org.apache.sysml.hops.rewrite.HopRewriteUtils; +import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; @@ -508,6 +509,16 @@ public class OptimizerUtils } /** + * + * @param mcIn + * @return + */ + public static boolean checkSparseBlockCSRConversion( MatrixCharacteristics mcIn ) { + return Checkpoint.CHECKPOINT_SPARSE_CSR + && OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT; + } + + /** * Returns the number of reducers that potentially run in parallel. * This is either just the configured value (SystemML config) or * the minimum of configured value and available reduce slots. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index b46a3af..45cb948 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -57,6 +57,7 @@ import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction; +import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -65,6 +66,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.utils.Statistics; @@ -1037,8 +1039,15 @@ public class SparkExecutionContext extends ExecutionContext in = in.coalesce( numPartitions ); } - //repartition and persist rdd (force creation of shuffled rdd via merge) + //repartition rdd (force creation of shuffled rdd via merge) JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in); + + //convert mcsr into memory-efficient csr if potentially sparse + if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) { + out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); + } + + //persist rdd in default storage level out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL ) .count(); //trigger caching to prevent contention http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2d32f6d5/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index 6a0aa34..ee25052 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -22,7 +22,6 @@ package org.apache.sysml.runtime.instructions.spark; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.DMLUnsupportedOperationException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -114,9 +113,7 @@ public class CheckpointSPInstruction extends UnarySPInstruction } //convert mcsr into memory-efficient csr if potentially sparse - if( OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT - && Checkpoint.CHECKPOINT_SPARSE_CSR ) - { + if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) { out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); }
