Repository: systemml Updated Branches: refs/heads/master 7a3447a50 -> 25a10f412
[SYSTEMML-2503] Exploit existing hash partitioning in spark cumoff ops Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/21b1a531 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/21b1a531 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/21b1a531 Branch: refs/heads/master Commit: 21b1a53141c74b4aa3af6e0263af3f6b0d7c1336 Parents: 7a3447a Author: Matthias Boehm <[email protected]> Authored: Wed Dec 5 19:39:53 2018 +0100 Committer: Matthias Boehm <[email protected]> Committed: Wed Dec 5 19:39:53 2018 +0100 ---------------------------------------------------------------------- .../runtime/instructions/spark/CumulativeOffsetSPInstruction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/21b1a531/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 53e6e91..8befc5a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -35,6 +35,7 @@ import org.apache.sysml.runtime.functionobjects.Builtin; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.LibMatrixAgg; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -95,7 +96,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { JavaPairRDD<MatrixIndexes,MatrixBlock> inData = sec.getBinaryBlockRDDHandleForVariable(input1.getName()); JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> joined = null; - if( _broadcast ) { + if( _broadcast && !SparkUtils.isHashPartitioned(inData) ) { //broadcast offsets and broadcast join with data PartitionedBroadcast<MatrixBlock> inAgg = sec.getBroadcastForVariable(input2.getName()); joined = inData.mapToPair(new RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen));
