Repository: incubator-systemml
Updated Branches:
  refs/heads/master 6efa0f36e -> 10b7b8669


[MINOR] Bugfix in Spark pmapmm operator

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/10b7b866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/10b7b866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/10b7b866

Branch: refs/heads/master
Commit: 10b7b8669bee522af49448db6bd37ae2ced24846
Parents: 6efa0f3
Author: Niketan Pansare <[email protected]>
Authored: Fri Feb 10 13:43:43 2017 -0800
Committer: Niketan Pansare <[email protected]>
Committed: Fri Feb 10 13:44:36 2017 -0800

----------------------------------------------------------------------
 .../runtime/instructions/spark/PMapmmSPInstruction.java   | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/10b7b866/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index 5efba81..ee2cd1e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -97,9 +97,13 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable( input2.getName() ); 
                MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());             
                
+               // This avoids errors such as 
java.lang.UnsupportedOperationException: Cannot change storage level of an RDD 
after it was already assigned a level
+               // Ideally, we should ensure that we donot redundantly call 
persist on the same RDD.
+               StorageLevel pmapmmStorageLevel = 
StorageLevel.MEMORY_AND_DISK();
+               
                //cache right hand side because accessed many times
                in2 = 
in2.repartition(sec.getSparkContext().defaultParallelism())
-                                .persist(StorageLevel.MEMORY_AND_DISK());
+                                .persist(pmapmmStorageLevel);
                
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
                for( int i=0; i<mc1.getRows(); 
i+=NUM_ROWBLOCKS*mc1.getRowsPerBlock() ) 
@@ -117,7 +121,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                        JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2
                                        .flatMapToPair(new PMapMMFunction(bpmb, 
i/mc1.getRowsPerBlock()));
                        rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2);
-                       rdd2.persist(StorageLevel.MEMORY_ONLY())
+                       rdd2.persist(pmapmmStorageLevel)
                            .count();
                        bpmb.unpersist(false);
                        
@@ -128,7 +132,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                }
                
                //cache final result
-               out = out.persist(StorageLevel.MEMORY_AND_DISK());
+               out = out.persist(pmapmmStorageLevel);
                out.count();
                
                //put output RDD handle into symbol table

Reply via email to