Repository: systemml
Updated Branches:
  refs/heads/master 7a3447a50 -> 25a10f412


[SYSTEMML-2503] Exploit existing hash partitioning in spark cumoff ops

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/21b1a531
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/21b1a531
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/21b1a531

Branch: refs/heads/master
Commit: 21b1a53141c74b4aa3af6e0263af3f6b0d7c1336
Parents: 7a3447a
Author: Matthias Boehm <[email protected]>
Authored: Wed Dec 5 19:39:53 2018 +0100
Committer: Matthias Boehm <[email protected]>
Committed: Wed Dec 5 19:39:53 2018 +0100

----------------------------------------------------------------------
 .../runtime/instructions/spark/CumulativeOffsetSPInstruction.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/21b1a531/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 53e6e91..8befc5a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -35,6 +35,7 @@ import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -95,7 +96,7 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction {
                JavaPairRDD<MatrixIndexes,MatrixBlock> inData = 
sec.getBinaryBlockRDDHandleForVariable(input1.getName());
                JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> 
joined = null;
                
-               if( _broadcast ) {
+               if( _broadcast && !SparkUtils.isHashPartitioned(inData) ) {
                        //broadcast offsets and broadcast join with data
                        PartitionedBroadcast<MatrixBlock> inAgg = 
sec.getBroadcastForVariable(input2.getName());
                        joined = inData.mapToPair(new 
RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen));

Reply via email to