Repository: incubator-systemml Updated Branches: refs/heads/master 132a43d38 -> c360304eb
[SYSTEMML-1281] Fix memory efficiency csv/dataset-binary rdd converters This patch makes the following two improvements wrt memory efficiency to our csv-binaryblock and dataset-binaryblock matrix rdd converters: (1) Shallow block copy on createCombiner for combineByKey to reduce the temporary memory consumption per task, especially for scenarios with almost no local aggregation. The shallow copy is safe, as the inputs are temporary partial blocks that are not accessible to any other operation. (2) Explicitly controlled number of output partitions according to its size in binary block representation. So far we simply used the number of input partitions. For compressed dataset inputs this lead to unnecessarily large output partitions, and thus creating memory pressure for subsequent tasks. For csv inputs, this lead to unnecessarily small output partitions that are later coalesce to the preferred number of partitions - however since coalesce only balances the number of merged partitions, this could lead to load imbalance. Both problems are now systematically solved at its root cause. Finally, this patch also includes a number of minor cleanups with regard to probing the number of partitions, missing tests, etc. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c360304e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c360304e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c360304e Branch: refs/heads/master Commit: c360304ebbf2d05d25067bbded49a3fbdb1edaac Parents: 132a43d Author: Matthias Boehm <[email protected]> Authored: Fri Feb 17 23:41:55 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Feb 17 23:56:12 2017 -0800 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 5 ++- .../instructions/gpu/context/JCudaContext.java | 2 +- .../spark/CheckpointSPInstruction.java | 18 ++--------- .../instructions/spark/MapmmSPInstruction.java | 4 +-- .../spark/utils/RDDAggregateUtils.java | 33 +++++++++++++++++--- .../spark/utils/RDDConverterUtils.java | 20 ++++++------ .../instructions/spark/utils/SparkUtils.java | 22 +++++++------ .../functions/io/csv/ReadCSVTest.java | 29 ++++++++++++++--- 8 files changed, 83 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 77bcc8d..b7fe6e8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -49,7 +49,6 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.instructions.cp.Data; -import org.apache.sysml.runtime.instructions.spark.CheckpointSPInstruction; import org.apache.sysml.runtime.instructions.spark.SPInstruction; import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject; import org.apache.sysml.runtime.instructions.spark.data.LineageObject; @@ -1181,8 +1180,8 @@ public class SparkExecutionContext extends ExecutionContext ((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD(); //investigate issue of unnecessarily large number of partitions - int numPartitions = CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in); - if( numPartitions < in.partitions().size() ) + int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in); + if( numPartitions < in.getNumPartitions() ) in = in.coalesce( numPartitions ); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java index af4cfbd..38f4e4c 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaContext.java @@ -133,7 +133,7 @@ public class JCudaContext extends GPUContext { long free[] = {0}; long total[] = {0}; if (cudaMemGetInfo(free, total) == cudaSuccess) { - long totalNumBytes = total[0]; + //long totalNumBytes = total[0]; deviceMemBytes.set(free[0]); } else { throw new RuntimeException("ERROR: Unable to get memory information of the GPU."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index 952e43c..1fa30b6 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -27,7 +27,6 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.BooleanObject; import org.apache.sysml.runtime.instructions.cp.CPOperand; @@ -103,8 +102,8 @@ public class CheckpointSPInstruction extends UnarySPInstruction { //(trigger coalesce if intended number of partitions exceeded by 20% //and not hash partitioned to avoid losing the existing partitioner) - int numPartitions = getNumCoalescePartitions(mcIn, in); - boolean coalesce = ( 1.2*numPartitions < in.partitions().size() + int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in); + boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions() && !SparkUtils.isHashPartitioned(in) ); //checkpoint pre-processing rdd operations @@ -157,17 +156,4 @@ public class CheckpointSPInstruction extends UnarySPInstruction } sec.setVariable( output.getName(), cd); } - - public static int getNumCoalescePartitions(MatrixCharacteristics mc, JavaPairRDD<?,?> in) - { - if( mc.dimsKnown(true) ) { - double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize(); - double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc); - return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1); - } - else { - return in.partitions().size(); - } - } } - http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 310664f..33eacbe 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -135,8 +135,8 @@ public class MapmmSPInstruction extends BinarySPInstruction { JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; if( requiresFlatMapFunction(_type, mcBc) ) { - if( requiresRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()) ) - in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.partitions().size())); + if( requiresRepartitioning(_type, mcRdd, mcBc, in1.getNumPartitions()) ) + in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.getNumPartitions())); out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) ); } else if( preservesPartitioning(mcRdd, _type) ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java index 8038157..61c950a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -155,13 +155,30 @@ public class RDDAggregateUtils * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} */ - public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in ) + public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in ) { + return mergeByKey(in, in.getNumPartitions(), true); + } + + /** + * Merges disjoint data of all blocks per key. + * + * Note: The behavior of this method is undefined for both sparse and dense data if the + * assumption of disjoint data is violated. + * + * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} + * @param numPartitions number of output partitions + * @param deepCopyCombiner indicator if the createCombiner functions needs to deep copy the input block + * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} + */ + public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in, + int numPartitions, boolean deepCopyCombiner ) { //use combine by key to avoid unnecessary deep block copies, i.e. //create combiner block once and merge remaining blocks in-place. - return in.combineByKey( new CreateBlockCombinerFunction(), + return in.combineByKey( + new CreateBlockCombinerFunction(deepCopyCombiner), new MergeBlocksFunction(false), - new MergeBlocksFunction(false) ); + new MergeBlocksFunction(false), numPartitions ); } /** @@ -251,13 +268,19 @@ public class RDDAggregateUtils private static class CreateBlockCombinerFunction implements Function<MatrixBlock, MatrixBlock> { private static final long serialVersionUID = 1987501624176848292L; - + + private final boolean _deep; + + public CreateBlockCombinerFunction(boolean deep) { + _deep = deep; + } + @Override public MatrixBlock call(MatrixBlock arg0) throws Exception { //create deep copy of given block - return new MatrixBlock(arg0); + return _deep ? new MatrixBlock(arg0) : arg0; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index e847471..d1e6793 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -185,10 +185,11 @@ public class RDDConverterUtils prepinput.mapPartitionsToPair(new CSVToBinaryBlockFunction( mc, sparse, hasHeader, delim, fill, fillValue)); - //aggregate partial matrix blocks - out = RDDAggregateUtils.mergeByKey( out ); - - return out; + //aggregate partial matrix blocks (w/ preferred number of output + //partitions as the data is likely smaller in binary block format, + //but also to bound the size of partitions for compressed inputs) + int parts = SparkUtils.getNumPreferredPartitions(mc, out); + return RDDAggregateUtils.mergeByKey(out, parts, false); } /** @@ -256,10 +257,11 @@ public class RDDConverterUtils prepinput.mapPartitionsToPair( new DataFrameToBinaryBlockFunction(mc, sparse, containsID, isVector)); - //aggregate partial matrix blocks - out = RDDAggregateUtils.mergeByKey( out ); - - return out; + //aggregate partial matrix blocks (w/ preferred number of output + //partitions as the data is likely smaller in binary block format, + //but also to bound the size of partitions for compressed inputs) + int parts = SparkUtils.getNumPreferredPartitions(mc, out); + return RDDAggregateUtils.mergeByKey(out, parts, false); } public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession, @@ -312,7 +314,7 @@ public class RDDConverterUtils double datasize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc); double rowsize = OptimizerUtils.estimatePartitionedSizeExactSparsity(1, mc.getCols(), mc.getNumRowBlocks(), mc.getColsPerBlock(), Math.ceil((double)mc.getNonZeros()/mc.getRows())); - double partsize = Math.ceil(datasize/in.partitions().size()); + double partsize = Math.ceil(datasize/in.getNumPartitions()); double blksz = Math.min(mc.getRows(), mc.getRowsPerBlock()); return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index d27e37a..2fe3981 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -29,8 +29,10 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.storage.StorageLevel; +import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction; @@ -62,12 +64,10 @@ public class SparkUtils return new Tuple2<MatrixIndexes,MatrixBlock>(in.getIndexes(), (MatrixBlock)in.getValue()); } - public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in ) - { + public static ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlock( ArrayList<IndexedMatrixValue> in ) { ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); for( IndexedMatrixValue imv : in ) ret.add(fromIndexedMatrixBlock(imv)); - return ret; } @@ -75,12 +75,10 @@ public class SparkUtils return new Pair<MatrixIndexes,MatrixBlock>(in.getIndexes(), (MatrixBlock)in.getValue()); } - public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in ) - { + public static ArrayList<Pair<MatrixIndexes,MatrixBlock>> fromIndexedMatrixBlockToPair( ArrayList<IndexedMatrixValue> in ) { ArrayList<Pair<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Pair<MatrixIndexes,MatrixBlock>>(); for( IndexedMatrixValue imv : in ) ret.add(fromIndexedMatrixBlockToPair(imv)); - return ret; } @@ -88,12 +86,10 @@ public class SparkUtils return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue()); } - public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( ArrayList<Pair<Long, FrameBlock>> in ) - { + public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( ArrayList<Pair<Long, FrameBlock>> in ) { ArrayList<Tuple2<Long, FrameBlock>> ret = new ArrayList<Tuple2<Long, FrameBlock>>(); for( Pair<Long, FrameBlock> ifv : in ) ret.add(fromIndexedFrameBlock(ifv)); - return ret; } @@ -120,6 +116,14 @@ public class SparkUtils && in.rdd().partitioner().get() instanceof HashPartitioner; } + public static int getNumPreferredPartitions(MatrixCharacteristics mc, JavaPairRDD<?,?> in) { + if( !mc.dimsKnown(true) ) + return in.getNumPartitions(); + double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize(); + double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc); + return (int) Math.max(Math.ceil(matrixPSize/hdfsBlockSize), 1); + } + /** * Creates a partitioning-preserving deep copy of the input matrix RDD, where * the indexes and values are copied. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c360304e/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java b/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java index 65fbfa0..12d07ef 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/io/csv/ReadCSVTest.java @@ -20,6 +20,7 @@ package org.apache.sysml.test.integration.functions.io.csv; import org.junit.Test; +import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.CompilerConfig; import org.apache.sysml.test.integration.AutomatedTestBase; @@ -37,7 +38,6 @@ import org.apache.sysml.test.utils.TestUtils; public class ReadCSVTest extends AutomatedTestBase { - private final static String TEST_NAME = "ReadCSVTest"; private final static String TEST_DIR = "functions/io/csv/"; private final static String TEST_CLASS_DIR = TEST_DIR + ReadCSVTest.class.getSimpleName() + "/"; @@ -78,6 +78,11 @@ public class ReadCSVTest extends AutomatedTestBase } @Test + public void testCSV1_SP() { + runCSVTest(1, RUNTIME_PLATFORM.SPARK, true); + } + + @Test public void testCSV2_Sequential_CP1() { runCSVTest(2, RUNTIME_PLATFORM.SINGLE_NODE, false); } @@ -101,6 +106,11 @@ public class ReadCSVTest extends AutomatedTestBase public void testCSV2_MR() { runCSVTest(2, RUNTIME_PLATFORM.HADOOP, true); } + + @Test + public void testCSV2_SP() { + runCSVTest(2, RUNTIME_PLATFORM.SPARK, true); + } @Test public void testCSV3_Sequential_CP1() { @@ -127,6 +137,11 @@ public class ReadCSVTest extends AutomatedTestBase runCSVTest(3, RUNTIME_PLATFORM.HADOOP, false); } + @Test + public void testCSV3_SP() { + runCSVTest(3, RUNTIME_PLATFORM.SPARK, false); + } + /** * * @param testNumber @@ -135,13 +150,17 @@ public class ReadCSVTest extends AutomatedTestBase */ private void runCSVTest(int testNumber, RUNTIME_PLATFORM platform, boolean parallel) { - RUNTIME_PLATFORM oldPlatform = rtplatform; + rtplatform = platform; + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + boolean oldpar = CompilerConfig.FLAG_PARREADWRITE_TEXT; try { - rtplatform = platform; CompilerConfig.FLAG_PARREADWRITE_TEXT = parallel; TestConfiguration config = getTestConfiguration(TEST_NAME); @@ -167,10 +186,10 @@ public class ReadCSVTest extends AutomatedTestBase TestUtils.compareScalars(dmlScalar, rScalar, eps); } - finally - { + finally { rtplatform = oldPlatform; CompilerConfig.FLAG_PARREADWRITE_TEXT = oldpar; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; } }
