Repository: incubator-systemml Updated Branches: refs/heads/master 6efa0f36e -> 10b7b8669
[MINOR] Bugfix in Spark pmapmm operator Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/10b7b866 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/10b7b866 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/10b7b866 Branch: refs/heads/master Commit: 10b7b8669bee522af49448db6bd37ae2ced24846 Parents: 6efa0f3 Author: Niketan Pansare <[email protected]> Authored: Fri Feb 10 13:43:43 2017 -0800 Committer: Niketan Pansare <[email protected]> Committed: Fri Feb 10 13:44:36 2017 -0800 ---------------------------------------------------------------------- .../runtime/instructions/spark/PMapmmSPInstruction.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/10b7b866/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java index 5efba81..ee2cd1e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java @@ -97,9 +97,13 @@ public class PMapmmSPInstruction extends BinarySPInstruction JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = sec.getBinaryBlockRDDHandleForVariable( input2.getName() ); MatrixCharacteristics mc1 = sec.getMatrixCharacteristics(input1.getName()); + // This avoids errors such as java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level + // Ideally, we should ensure that we donot redundantly call persist on the same RDD. + StorageLevel pmapmmStorageLevel = StorageLevel.MEMORY_AND_DISK(); + //cache right hand side because accessed many times in2 = in2.repartition(sec.getSparkContext().defaultParallelism()) - .persist(StorageLevel.MEMORY_AND_DISK()); + .persist(pmapmmStorageLevel); JavaPairRDD<MatrixIndexes,MatrixBlock> out = null; for( int i=0; i<mc1.getRows(); i+=NUM_ROWBLOCKS*mc1.getRowsPerBlock() ) @@ -117,7 +121,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2 .flatMapToPair(new PMapMMFunction(bpmb, i/mc1.getRowsPerBlock())); rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2); - rdd2.persist(StorageLevel.MEMORY_ONLY()) + rdd2.persist(pmapmmStorageLevel) .count(); bpmb.unpersist(false); @@ -128,7 +132,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction } //cache final result - out = out.persist(StorageLevel.MEMORY_AND_DISK()); + out = out.persist(pmapmmStorageLevel); out.count(); //put output RDD handle into symbol table
