[SYSTEMML-1268] Replace accumulators with new accumulatorV2 framework This patch globally replaces all uses of deprecated accumulators with the new accumulatorV2 framework. For custom accumulators, this entailed a reimplementation. Furthermore, we now avoid expensive double-long casting and use named accumulators for easier debugging in the webui.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/732e6da4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/732e6da4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/732e6da4 Branch: refs/heads/master Commit: 732e6da4f924a99ba5fbddf656436fc1bd62668f Parents: 12d79c5 Author: Matthias Boehm <[email protected]> Authored: Tue Feb 14 21:56:14 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Wed Feb 15 10:49:21 2017 -0800 ---------------------------------------------------------------------- .../parfor/RemoteDPParForSpark.java | 10 +-- .../parfor/RemoteDPParForSparkWorker.java | 9 +-- .../parfor/RemoteParForSpark.java | 10 +-- .../parfor/RemoteParForSparkWorker.java | 8 +- ...ReturnParameterizedBuiltinSPInstruction.java | 79 +++++++++++++------- .../instructions/spark/WriteSPInstruction.java | 10 +-- .../ComputeBinaryBlockNnzFunction.java | 9 +-- .../spark/utils/RDDConverterUtils.java | 18 ++--- 8 files changed, 89 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index 7286eca..8663038 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -23,9 +23,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.util.LongAccumulator; import scala.Tuple2; @@ -73,8 +73,8 @@ public class RemoteDPParForSpark InputInfo ii = InputInfo.BinaryBlockInputInfo; //initialize accumulators for tasks/iterations - Accumulator<Integer> aTasks = sc.accumulator(0); - Accumulator<Integer> aIters = sc.accumulator(0); + LongAccumulator aTasks = sc.sc().longAccumulator("tasks"); + LongAccumulator aIters = sc.sc().longAccumulator("iterations"); JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar); DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); @@ -88,8 +88,8 @@ public class RemoteDPParForSpark //de-serialize results LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG); - int numTasks = aTasks.value(); //get accumulator value - int numIters = aIters.value(); //get accumulator value + int numTasks = aTasks.value().intValue(); //get accumulator value + int numIters = aIters.value().intValue(); //get accumulator value //create output symbol table entries RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index c973115..e12d010 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -24,10 +24,9 @@ import java.util.ArrayList; import java.util.Iterator; import org.apache.hadoop.io.Writable; -import org.apache.spark.Accumulator; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; - +import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; @@ -61,10 +60,10 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF private boolean _tSparseCol = false; private PDataPartitionFormat _dpf = null; - private Accumulator<Integer> _aTasks = null; - private Accumulator<Integer> _aIters = null; + private LongAccumulator _aTasks = null; + private LongAccumulator _aIters = null; - public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, Accumulator<Integer> atasks, Accumulator<Integer> aiters) + public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { //keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java index 47ea2e6..9d3f0f3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java @@ -23,8 +23,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.util.LongAccumulator; import scala.Tuple2; @@ -64,8 +64,8 @@ public class RemoteParForSpark JavaSparkContext sc = sec.getSparkContext(); //initialize accumulators for tasks/iterations - Accumulator<Integer> aTasks = sc.accumulator(0); - Accumulator<Integer> aIters = sc.accumulator(0); + LongAccumulator aTasks = sc.sc().longAccumulator("tasks"); + LongAccumulator aIters = sc.sc().longAccumulator("iterations"); //run remote_spark parfor job //(w/o lazy evaluation to fit existing parfor framework, e.g., result merge) @@ -77,8 +77,8 @@ public class RemoteParForSpark //de-serialize results LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG); - int numTasks = aTasks.value(); //get accumulator value - int numIters = aIters.value(); //get accumulator value + int numTasks = aTasks.value().intValue(); //get accumulator value + int numIters = aIters.value().intValue(); //get accumulator value //create output symbol table entries RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index 75e5137..e12376a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -23,9 +23,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; -import org.apache.spark.Accumulator; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; @@ -42,10 +42,10 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun private String _prog = null; private boolean _caching = true; - private Accumulator<Integer> _aTasks = null; - private Accumulator<Integer> _aIters = null; + private LongAccumulator _aTasks = null; + private LongAccumulator _aIters = null; - public RemoteParForSparkWorker(String program, boolean cpCaching, Accumulator<Integer> atasks, Accumulator<Integer> aiters) + public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { //keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java index daa1ce5..5890bf9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java @@ -20,20 +20,19 @@ package org.apache.sysml.runtime.instructions.spark; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map.Entry; -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.util.AccumulatorV2; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -124,7 +123,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI Encoder encoderBuild = EncoderFactory.createEncoder(spec, colnames, fo.getSchema(), (int)fo.getNumColumns(), null); - Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc()); + MaxLongAccumulator accMax = registerMaxLongAccumulator(sec.getSparkContext()); JavaRDD<String> rcMaps = in .mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild)) .distinct().groupByKey() @@ -190,6 +189,54 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI return null; } + private static MaxLongAccumulator registerMaxLongAccumulator(JavaSparkContext sc) { + MaxLongAccumulator acc = new MaxLongAccumulator(Long.MIN_VALUE); + sc.sc().register(acc, "max"); + return acc; + } + + + private static class MaxLongAccumulator extends AccumulatorV2<Long,Long> + { + private static final long serialVersionUID = -3739727823287550826L; + + private long _value = Long.MIN_VALUE; + + public MaxLongAccumulator(long value) { + _value = value; + } + + @Override + public void add(Long arg0) { + _value = Math.max(_value, arg0); + } + + @Override + public AccumulatorV2<Long, Long> copy() { + return new MaxLongAccumulator(_value); + } + + @Override + public boolean isZero() { + return _value == Long.MIN_VALUE; + } + + @Override + public void merge(AccumulatorV2<Long, Long> arg0) { + _value = Math.max(_value, arg0.value()); + } + + @Override + public void reset() { + _value = Long.MIN_VALUE; + } + + @Override + public Long value() { + return _value; + } + } + /** * This function pre-aggregates distinct values of recoded columns per partition * (part of distributed recode map construction, used for recoding, binning and @@ -242,9 +289,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI { private static final long serialVersionUID = -1034187226023517119L; - private Accumulator<Long> _accMax = null; + private MaxLongAccumulator _accMax = null; - public TransformEncodeGroupFunction( Accumulator<Long> accMax ) { + public TransformEncodeGroupFunction( MaxLongAccumulator accMax ) { _accMax = accMax; } @@ -275,26 +322,6 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } } - private static class MaxAcc implements AccumulatorParam<Long>, Serializable - { - private static final long serialVersionUID = -3739727823287550826L; - - @Override - public Long addInPlace(Long arg0, Long arg1) { - return Math.max(arg0, arg1); - } - - @Override - public Long zero(Long arg0) { - return arg0; - } - - @Override - public Long addAccumulator(Long arg0, Long arg1) { - return Math.max(arg0, arg1); - } - } - public static class TransformEncodeBuild2Function implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata> { private static final long serialVersionUID = 6336375833412029279L; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index 912dbe3..3387770 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -25,9 +25,9 @@ import java.util.Random; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.util.LongAccumulator; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -203,12 +203,12 @@ public class WriteSPInstruction extends SPInstruction else if( oi == OutputInfo.CSVOutputInfo ) { JavaRDD<String> out = null; - Accumulator<Double> aNnz = null; + LongAccumulator aNnz = null; if ( isInputMatrixBlock ) { //piggyback nnz computation on actual write if( !mc.nnzKnown() ) { - aNnz = sec.getSparkContext().accumulator(0L); + aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); } @@ -252,9 +252,9 @@ public class WriteSPInstruction extends SPInstruction } else if( oi == OutputInfo.BinaryBlockOutputInfo ) { //piggyback nnz computation on actual write - Accumulator<Double> aNnz = null; + LongAccumulator aNnz = null; if( !mc.nnzKnown() ) { - aNnz = sec.getSparkContext().accumulator(0L); + aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java index f76784e..7b6daad 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java @@ -19,18 +19,17 @@ package org.apache.sysml.runtime.instructions.spark.functions; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.Function; - +import org.apache.spark.util.LongAccumulator; import org.apache.sysml.runtime.matrix.data.MatrixBlock; public class ComputeBinaryBlockNnzFunction implements Function<MatrixBlock,MatrixBlock> { private static final long serialVersionUID = -8396410450821999936L; - private Accumulator<Double> _aNnz = null; + private LongAccumulator _aNnz = null; - public ComputeBinaryBlockNnzFunction( Accumulator<Double> aNnz ) + public ComputeBinaryBlockNnzFunction( LongAccumulator aNnz ) { _aNnz = aNnz; } @@ -38,7 +37,7 @@ public class ComputeBinaryBlockNnzFunction implements Function<MatrixBlock,Matri @Override public MatrixBlock call(MatrixBlock arg0) throws Exception { - _aNnz.add( (double)arg0.getNonZeros() ); + _aNnz.add( arg0.getNonZeros() ); return arg0; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index b5a4b58..1310b80 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -27,7 +27,6 @@ import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -46,6 +45,7 @@ import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; +import org.apache.spark.util.LongAccumulator; import scala.Tuple2; @@ -166,7 +166,7 @@ public class RDDConverterUtils { //determine unknown dimensions and sparsity if required if( !mc.dimsKnown(true) ) { - Accumulator<Double> aNnz = sc.accumulator(0L); + LongAccumulator aNnz = sc.sc().longAccumulator("nnz"); JavaRDD<String> tmp = input.values() .map(new CSVAnalysisFunction(aNnz, delim)); long rlen = tmp.count() - (hasHeader ? 1 : 0); @@ -230,7 +230,7 @@ public class RDDConverterUtils { //determine unknown dimensions and sparsity if required if( !mc.dimsKnown(true) ) { - Accumulator<Double> aNnz = sc.accumulator(0L); + LongAccumulator aNnz = sc.sc().longAccumulator("nnz"); JavaRDD<Row> tmp = df.javaRDD().map(new DataFrameAnalysisFunction(aNnz, containsID, isVector)); long rlen = tmp.count(); long clen = !isVector ? df.columns().length - (containsID?1:0) : @@ -531,10 +531,10 @@ public class RDDConverterUtils { private static final long serialVersionUID = 2310303223289674477L; - private Accumulator<Double> _aNnz = null; + private LongAccumulator _aNnz = null; private String _delim = null; - public CSVAnalysisFunction( Accumulator<Double> aNnz, String delim ) + public CSVAnalysisFunction( LongAccumulator aNnz, String delim ) { _aNnz = aNnz; _delim = delim; @@ -552,7 +552,7 @@ public class RDDConverterUtils int lnnz = IOUtilFunctions.countNnz(cols); //update counters - _aNnz.add( (double)lnnz ); + _aNnz.add( lnnz ); return line; } @@ -922,11 +922,11 @@ public class RDDConverterUtils { private static final long serialVersionUID = 5705371332119770215L; - private Accumulator<Double> _aNnz = null; + private LongAccumulator _aNnz = null; private boolean _containsID; private boolean _isVector; - public DataFrameAnalysisFunction( Accumulator<Double> aNnz, boolean containsID, boolean isVector) { + public DataFrameAnalysisFunction( LongAccumulator aNnz, boolean containsID, boolean isVector) { _aNnz = aNnz; _containsID = containsID; _isVector = isVector; @@ -940,7 +940,7 @@ public class RDDConverterUtils int lnnz = countNnz(vect, _isVector, off); //update counters - _aNnz.add( (double)lnnz ); + _aNnz.add( lnnz ); return arg0; } }
