Repository: incubator-systemml Updated Branches: refs/heads/master 5baac2d62 -> f32bd8ebd
[SYSTEMML-1317] Avoid unnecessary copies on creating combiner blocks For most aggregations (e.g., merge of partial blocks, or aggregate by key), we use combineByKey to allow for update-in-place when adding to combiner blocks. However, so far we deep copy - often unnecessarily - blocks when creating the combiner blocks. This creates unnecessary memory pressure in scenarios where there is almost no aggregation. We now exploit the knowledge of temporary blocks to allow aggregations with shallow copy to reduce garbage collection overhead and memory pressure. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9919a6c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9919a6c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9919a6c6 Branch: refs/heads/master Commit: 9919a6c61a288267805f406cb5c1e1030ddf0a20 Parents: 5baac2d Author: Matthias Boehm <[email protected]> Authored: Wed Mar 15 18:53:09 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 16 11:30:41 2017 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 5 +- .../parfor/ResultMergeRemoteSpark.java | 2 +- .../spark/AggregateTernarySPInstruction.java | 2 +- .../spark/AggregateUnarySPInstruction.java | 2 +- .../instructions/spark/CpmmSPInstruction.java | 2 +- .../spark/CumulativeAggregateSPInstruction.java | 2 +- .../instructions/spark/MapmmSPInstruction.java | 2 +- .../instructions/spark/PMapmmSPInstruction.java | 2 +- .../ParameterizedBuiltinSPInstruction.java | 8 +-- .../instructions/spark/PmmSPInstruction.java | 2 +- .../spark/QuaternarySPInstruction.java | 2 +- .../spark/ReblockSPInstruction.java | 2 +- .../instructions/spark/ReorgSPInstruction.java | 2 +- .../instructions/spark/RmmSPInstruction.java | 2 +- .../instructions/spark/SpoofSPInstruction.java | 8 +-- .../instructions/spark/Tsmm2SPInstruction.java | 2 +- .../spark/utils/FrameRDDConverterUtils.java | 4 +- .../spark/utils/RDDAggregateUtils.java | 57 ++++++++++++++++---- .../spark/utils/RDDConverterUtils.java | 4 +- .../spark/utils/RDDConverterUtilsExt.java | 2 +- .../instructions/spark/utils/RDDSortUtils.java | 8 +-- .../java/org/apache/sysml/utils/Statistics.java | 6 ++- 22 files changed, 84 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index faababe..648c51e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -1185,8 +1185,9 @@ public class SparkExecutionContext extends ExecutionContext in = in.coalesce( numPartitions ); } - //repartition rdd (force creation of shuffled rdd via merge) - JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in); + //repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit + //executed on the original data, because there will be no merge, i.e., no key duplicates + JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in, false); //convert mcsr into memory-efficient csr if potentially sparse if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index ef1ae02..49293e4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -162,7 +162,7 @@ public class ResultMergeRemoteSpark extends ResultMerge else { //direct merge in any order (disjointness guaranteed) - out = RDDAggregateUtils.mergeByKey(rdd); + out = RDDAggregateUtils.mergeByKey(rdd, false); } //Step 3: create output rdd handle w/ lineage http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java index 2a305be..6a494b7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java @@ -116,7 +116,7 @@ public class AggregateTernarySPInstruction extends ComputationSPInstruction else //tack+* multi block { //multi-block aggregation and drop correction - out = RDDAggregateUtils.aggByKeyStable(out, aggop.aggOp); + out = RDDAggregateUtils.aggByKeyStable(out, aggop.aggOp, false); out = out.mapValues( new AggregateDropCorrectionFunction(aggop.aggOp) ); //put output RDD handle into symbol table http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java index eb9324f..73f67a3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java @@ -118,7 +118,7 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction else if( _aggtype == SparkAggType.MULTI_BLOCK ) { //in case of multi-block aggregation, we always keep the correction out = out.mapToPair(new RDDUAggFunction(auop, mc.getRowsPerBlock(), mc.getColsPerBlock())); - out = RDDAggregateUtils.aggByKeyStable(out, aggop); + out = RDDAggregateUtils.aggByKeyStable(out, aggop, false); //drop correction after aggregation if required (aggbykey creates //partitioning, drop correction via partitioning-preserving mapvalues) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java index 4227320..4a29c5e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java @@ -111,7 +111,7 @@ public class CpmmSPInstruction extends BinarySPInstruction } else //DEFAULT: MULTI_BLOCK { - out = RDDAggregateUtils.sumByKeyStable(out); + out = RDDAggregateUtils.sumByKeyStable(out, false); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java index e0cad17..9dd81aa 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java @@ -78,7 +78,7 @@ public class CumulativeAggregateSPInstruction extends AggregateUnarySPInstructio AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr; JavaPairRDD<MatrixIndexes,MatrixBlock> out = in.mapToPair(new RDDCumAggFunction(auop, rlen, brlen, bclen)); - out = RDDAggregateUtils.mergeByKey(out); + out = RDDAggregateUtils.mergeByKey(out, false); //put output handle in symbol table sec.setRDDHandleForVariable(output.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index 33eacbe..5baffb0 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -149,7 +149,7 @@ public class MapmmSPInstruction extends BinarySPInstruction out = out.filter(new FilterNonEmptyBlocksFunction()); if( _aggtype == SparkAggType.MULTI_BLOCK ) - out = RDDAggregateUtils.sumByKeyStable(out); + out = RDDAggregateUtils.sumByKeyStable(out, false); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java index ee2cd1e..eceeabc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java @@ -120,7 +120,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction //matrix multiplication JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2 .flatMapToPair(new PMapMMFunction(bpmb, i/mc1.getRowsPerBlock())); - rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2); + rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2, false); rdd2.persist(pmapmmStorageLevel) .count(); bpmb.unpersist(false); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index d7f4f7d..5c27f60 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -225,7 +225,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction target.flatMapToPair(new RDDMapGroupedAggFunction(groups, _optr, ngroups, mc1.getRowsPerBlock(), mc1.getColsPerBlock())); - out = RDDAggregateUtils.sumByKeyStable(out); + out = RDDAggregateUtils.sumByKeyStable(out, false); //updated characteristics and handle outputs mcOut.set(ngroups, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1); @@ -355,7 +355,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction .flatMapToPair(new RDDRemoveEmptyFunction(rows, maxDim, brlen, bclen)); } - out = RDDAggregateUtils.mergeByKey(out); + out = RDDAggregateUtils.mergeByKey(out, false); //store output rdd handle sec.setRDDHandleForVariable(output.getName(), out); @@ -414,7 +414,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction //execute remove empty rows/cols operation JavaPairRDD<MatrixIndexes,MatrixBlock> out = in .flatMapToPair(new RDDRExpandFunction(maxVal, dirRows, cast, ignore, brlen, bclen)); - out = RDDAggregateUtils.mergeByKey(out); + out = RDDAggregateUtils.mergeByKey(out, false); //store output rdd handle sec.setRDDHandleForVariable(output.getName(), out); @@ -484,7 +484,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction if( mc.getCols() > mc.getNumColBlocks() ) { in = in.mapToPair(new RDDTransformDecodeExpandFunction( (int)mc.getCols(), mc.getColsPerBlock())); - in = RDDAggregateUtils.mergeByKey(in); + in = RDDAggregateUtils.mergeByKey(in, false); } //construct decoder and decode individual matrix blocks http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java index ce87c46..2e6b46e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java @@ -103,7 +103,7 @@ public class PmmSPInstruction extends BinarySPInstruction //execute pmm instruction JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1 .flatMapToPair( new RDDPMMFunction(_type, in2, rlen, mc.getRowsPerBlock()) ); - out = RDDAggregateUtils.sumByKeyStable(out); + out = RDDAggregateUtils.sumByKeyStable(out, false); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java index 0ae8a67..7dfe4be 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java @@ -304,7 +304,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction { //aggregation if required (map/redwdivmm) if( qop.wtype3 != null && !qop.wtype3.isBasic() ) - out = RDDAggregateUtils.sumByKeyStable( out ); + out = RDDAggregateUtils.sumByKeyStable(out, false); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java index a324f9d..f99d47f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java @@ -178,7 +178,7 @@ public class ReblockSPInstruction extends UnarySPInstruction JavaPairRDD<MatrixIndexes, MatrixBlock> out = in1.flatMapToPair(new ExtractBlockForBinaryReblock(mc, mcOut)); - out = RDDAggregateUtils.mergeByKey( out ); + out = RDDAggregateUtils.mergeByKey(out, false); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java index 62ffbce..ce1c584 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java @@ -139,7 +139,7 @@ public class ReorgSPInstruction extends UnarySPInstruction //execute reverse reorg operation out = in1.flatMapToPair(new RDDRevFunction(mcIn)); if( mcIn.getRows() % mcIn.getRowsPerBlock() != 0 ) - out = RDDAggregateUtils.mergeByKey(out); + out = RDDAggregateUtils.mergeByKey(out, false); } else if ( opcode.equalsIgnoreCase("rdiag") ) // DIAG { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java index e6b5755..1fe025a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java @@ -95,7 +95,7 @@ public class RmmSPInstruction extends BinarySPInstruction JavaPairRDD<MatrixIndexes,MatrixBlock> out = tmp1.join( tmp2 ) //join by result block .mapToPair( new RmmMultiplyFunction() ); //do matrix multiplication - out = RDDAggregateUtils.sumByKeyStable(out); //aggregation per result block + out = RDDAggregateUtils.sumByKeyStable(out, false); //aggregation per result block //put output block into symbol table (no lineage because single block) updateBinaryMMOutputMatrixCharacteristics(sec, true); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index c78613d..ca92b9b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -123,9 +123,9 @@ public class SpoofSPInstruction extends SPInstruction //NOTE: workaround with partition size needed due to potential bug in SPARK //TODO investigate if some other side effect of correct blocks if( out.partitions().size() > mcIn.getNumRowBlocks() ) - out = RDDAggregateUtils.sumByKeyStable(out, (int)mcIn.getNumRowBlocks()); + out = RDDAggregateUtils.sumByKeyStable(out, (int)mcIn.getNumRowBlocks(), false); else - out = RDDAggregateUtils.sumByKeyStable(out); + out = RDDAggregateUtils.sumByKeyStable(out, false); } sec.setRDDHandleForVariable(_out.getName(), out); @@ -158,9 +158,9 @@ public class SpoofSPInstruction extends SPInstruction //NOTE: workaround with partition size needed due to potential bug in SPARK //TODO investigate if some other side effect of correct blocks if( in.partitions().size() > mcOut.getNumRowBlocks()*mcOut.getNumColBlocks() ) - out = RDDAggregateUtils.sumByKeyStable( out, (int)(mcOut.getNumRowBlocks()*mcOut.getNumColBlocks()) ); + out = RDDAggregateUtils.sumByKeyStable(out, (int)(mcOut.getNumRowBlocks()*mcOut.getNumColBlocks()), false); else - out = RDDAggregateUtils.sumByKeyStable( out ); + out = RDDAggregateUtils.sumByKeyStable(out, false); } sec.setRDDHandleForVariable(_out.getName(), out); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java index da917b5..1f1b3e4 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java @@ -119,7 +119,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction else { //output individual output blocks and aggregate by key (no action) JavaPairRDD<MatrixIndexes,MatrixBlock> tmp2 = in.flatMapToPair(new RDDTSMM2Function(bpmb, _type)); - JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.sumByKeyStable(tmp2); + JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.sumByKeyStable(tmp2, false); //put output RDD handle into symbol table sec.getMatrixCharacteristics(output.getName()).set(outputDim, outputDim, mc.getRowsPerBlock(), mc.getColsPerBlock()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 013a1a8..b736931 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -208,7 +208,7 @@ public class FrameRDDConverterUtils //shuffle matrix blocks (instead of frame blocks) in order to exploit //sparse formats (for sparse or wide matrices) during shuffle - in = RDDAggregateUtils.mergeByKey(in); + in = RDDAggregateUtils.mergeByKey(in, false); } //convert individual matrix blocks to frame blocks (w/o shuffle) @@ -223,7 +223,7 @@ public class FrameRDDConverterUtils .flatMapToPair(new BinaryBlockToMatrixBlockFunction(mcIn, mcOut)); //aggregate partial matrix blocks - return RDDAggregateUtils.mergeByKey( out ); + return RDDAggregateUtils.mergeByKey(out, false); } //===================================== http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java index 2dfff74..2759f7f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -42,8 +42,7 @@ import org.apache.sysml.runtime.matrix.operators.AggregateOperator; * */ public class RDDAggregateUtils -{ - +{ //internal configuration to use tree aggregation (treeReduce w/ depth=2), //this is currently disabled because it was 2x slower than a simple //single-block reduce due to additional overhead for shuffling @@ -69,15 +68,21 @@ public class RDDAggregateUtils } } - public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in ) { - return sumByKeyStable(in, in.getNumPartitions()); + public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable(JavaPairRDD<MatrixIndexes, MatrixBlock> in) { + return sumByKeyStable(in, in.getNumPartitions(), true); + } + + public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable(JavaPairRDD<MatrixIndexes, MatrixBlock> in, + boolean deepCopyCombiner) { + return sumByKeyStable(in, in.getNumPartitions(), deepCopyCombiner); } - public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in, int numPartitions ) + public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable(JavaPairRDD<MatrixIndexes, MatrixBlock> in, + int numPartitions, boolean deepCopyCombiner) { //stable sum of blocks per key, by passing correction blocks along with aggregates JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = - in.combineByKey( new CreateCorrBlockCombinerFunction(), + in.combineByKey( new CreateCorrBlockCombinerFunction(deepCopyCombiner), new MergeSumBlockValueFunction(), new MergeSumBlockCombinerFunction(), numPartitions ); @@ -134,13 +139,24 @@ public class RDDAggregateUtils new AggregateSingleBlockFunction(aop) ); } - public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop ) + public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in, + AggregateOperator aop) { + return aggByKeyStable(in, aop, in.getNumPartitions(), true); + } + + public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in, + AggregateOperator aop, boolean deepCopyCombiner ) { + return aggByKeyStable(in, aop, in.getNumPartitions(), deepCopyCombiner); + } + + public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( JavaPairRDD<MatrixIndexes, MatrixBlock> in, + AggregateOperator aop, int numPartitions, boolean deepCopyCombiner ) { //stable sum of blocks per key, by passing correction blocks along with aggregates JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = - in.combineByKey( new CreateCorrBlockCombinerFunction(), + in.combineByKey( new CreateCorrBlockCombinerFunction(deepCopyCombiner), new MergeAggBlockValueFunction(aop), - new MergeAggBlockCombinerFunction(aop) ); + new MergeAggBlockCombinerFunction(aop), numPartitions ); //strip-off correction blocks from JavaPairRDD<MatrixIndexes, MatrixBlock> out = @@ -170,6 +186,21 @@ public class RDDAggregateUtils * assumption of disjoint data is violated. * * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} + * @param deepCopyCombiner indicator if the createCombiner functions needs to deep copy the input block + * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} + */ + public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( JavaPairRDD<MatrixIndexes, MatrixBlock> in, + boolean deepCopyCombiner ) { + return mergeByKey(in, in.getNumPartitions(), deepCopyCombiner); + } + + /** + * Merges disjoint data of all blocks per key. + * + * Note: The behavior of this method is undefined for both sparse and dense data if the + * assumption of disjoint data is violated. + * + * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} * @param numPartitions number of output partitions * @param deepCopyCombiner indicator if the createCombiner functions needs to deep copy the input block * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>} @@ -205,13 +236,19 @@ public class RDDAggregateUtils { private static final long serialVersionUID = -3666451526776017343L; + private final boolean _deep; + + public CreateCorrBlockCombinerFunction(boolean deep) { + _deep = deep; + } + @Override public CorrMatrixBlock call(MatrixBlock arg0) throws Exception { //deep copy to allow update in-place return new CorrMatrixBlock( - new MatrixBlock(arg0)); + _deep ? new MatrixBlock(arg0) : arg0); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index 134a071..c5058a6 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -92,7 +92,7 @@ public class RDDConverterUtils } //aggregate partial matrix blocks - out = RDDAggregateUtils.mergeByKey( out ); + out = RDDAggregateUtils.mergeByKey(out, false); return out; } @@ -112,7 +112,7 @@ public class RDDConverterUtils } //aggregate partial matrix blocks - out = RDDAggregateUtils.mergeByKey( out ); + out = RDDAggregateUtils.mergeByKey(out, false); return out; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index cdf090d..6409735 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -110,7 +110,7 @@ public class RDDConverterUtilsExt } //aggregate partial matrix blocks - out = RDDAggregateUtils.mergeByKey( out ); + out = RDDAggregateUtils.mergeByKey(out, false); return out; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java index de1190a..046c015 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java @@ -66,7 +66,7 @@ public class RDDSortUtils JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals .zipWithIndex() .mapPartitionsToPair(new ConvertToBinaryBlockFunction(rlen, brlen)); - ret = RDDAggregateUtils.mergeByKey(ret); + ret = RDDAggregateUtils.mergeByKey(ret, false); return ret; } @@ -88,7 +88,7 @@ public class RDDSortUtils JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals .zipWithIndex() .mapPartitionsToPair(new ConvertToBinaryBlockFunction2(rlen, brlen)); - ret = RDDAggregateUtils.mergeByKey(ret); + ret = RDDAggregateUtils.mergeByKey(ret, false); return ret; } @@ -111,7 +111,7 @@ public class RDDSortUtils JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals .zipWithIndex() .mapPartitionsToPair(new ConvertToBinaryBlockFunction3(rlen, brlen)); - ret = RDDAggregateUtils.mergeByKey(ret); + ret = RDDAggregateUtils.mergeByKey(ret, false); return ret; } @@ -137,7 +137,7 @@ public class RDDSortUtils .mapToPair(new ExtractIndexFunction()) .sortByKey() .mapPartitionsToPair(new ConvertToBinaryBlockFunction4(rlen, brlen)); - ixmap = RDDAggregateUtils.mergeByKey(ixmap); + ixmap = RDDAggregateUtils.mergeByKey(ixmap, false); //replicate indexes for all column blocks JavaPairRDD<MatrixIndexes, MatrixBlock> rixmap = ixmap http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/utils/Statistics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/Statistics.java b/src/main/java/org/apache/sysml/utils/Statistics.java index 801e402..097f58e 100644 --- a/src/main/java/org/apache/sysml/utils/Statistics.java +++ b/src/main/java/org/apache/sysml/utils/Statistics.java @@ -175,9 +175,11 @@ public class Statistics public static void resetNoOfCompiledJobs( int count ) { //reset both mr/sp for multiple tests within one jvm numCompiledSPInst.reset(); - numCompiledSPInst.add(count); numCompiledMRJobs.reset(); - numCompiledMRJobs.add(count); + if( OptimizerUtils.isSparkExecutionMode() ) + numCompiledSPInst.add(count); + else + numCompiledMRJobs.add(count); } public static void resetNoOfExecutedJobs() {
