[SYSTEMML-1268] Replace accumulators with new accumulatorV2 framework

This patch globally replaces all uses of deprecated accumulators with
the new accumulatorV2 framework. For custom accumulators, this entailed
a reimplementation. Furthermore, we now avoid expensive double-long
casting and use named accumulators for easier debugging in the webui.  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/732e6da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/732e6da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/732e6da4

Branch: refs/heads/master
Commit: 732e6da4f924a99ba5fbddf656436fc1bd62668f
Parents: 12d79c5
Author: Matthias Boehm <[email protected]>
Authored: Tue Feb 14 21:56:14 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Wed Feb 15 10:49:21 2017 -0800

----------------------------------------------------------------------
 .../parfor/RemoteDPParForSpark.java             | 10 +--
 .../parfor/RemoteDPParForSparkWorker.java       |  9 +--
 .../parfor/RemoteParForSpark.java               | 10 +--
 .../parfor/RemoteParForSparkWorker.java         |  8 +-
 ...ReturnParameterizedBuiltinSPInstruction.java | 79 +++++++++++++-------
 .../instructions/spark/WriteSPInstruction.java  | 10 +--
 .../ComputeBinaryBlockNnzFunction.java          |  9 +--
 .../spark/utils/RDDConverterUtils.java          | 18 ++---
 8 files changed, 89 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 7286eca..8663038 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -23,9 +23,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
 
 import scala.Tuple2;
 
@@ -73,8 +73,8 @@ public class RemoteDPParForSpark
                InputInfo ii = InputInfo.BinaryBlockInputInfo;
                                
                //initialize accumulators for tasks/iterations
-               Accumulator<Integer> aTasks = sc.accumulator(0);
-               Accumulator<Integer> aIters = sc.accumulator(0);
+               LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
+               LongAccumulator aIters = sc.sc().longAccumulator("iterations");
                
                JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(matrixvar);
                DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
@@ -88,8 +88,8 @@ public class RemoteDPParForSpark
                
                //de-serialize results
                LocalVariableMap[] results = RemoteParForUtils.getResults(out, 
LOG);
-               int numTasks = aTasks.value(); //get accumulator value
-               int numIters = aIters.value(); //get accumulator value
+               int numTasks = aTasks.value().intValue(); //get accumulator 
value
+               int numIters = aIters.value().intValue(); //get accumulator 
value
                
                //create output symbol table entries
                RemoteParForJobReturn ret = new RemoteParForJobReturn(true, 
numTasks, numIters, results);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index c973115..e12d010 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -24,10 +24,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.spark.Accumulator;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
@@ -61,10 +60,10 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        private boolean _tSparseCol = false;
        private PDataPartitionFormat _dpf = null;
        
-       private Accumulator<Integer> _aTasks = null;
-       private Accumulator<Integer> _aIters = null;
+       private LongAccumulator _aTasks = null;
+       private LongAccumulator _aIters = null;
        
-       public RemoteDPParForSparkWorker(String program, String inputVar, 
String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, Accumulator<Integer> 
atasks, Accumulator<Integer> aiters) 
+       public RemoteDPParForSparkWorker(String program, String inputVar, 
String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, 
LongAccumulator aiters) 
                throws DMLRuntimeException
        {
                //keep inputs (unfortunately, spark does not expose task ids 
and it would be implementation-dependent

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 47ea2e6..9d3f0f3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
 
 import scala.Tuple2;
 
@@ -64,8 +64,8 @@ public class RemoteParForSpark
                JavaSparkContext sc = sec.getSparkContext();
                
                //initialize accumulators for tasks/iterations
-               Accumulator<Integer> aTasks = sc.accumulator(0);
-               Accumulator<Integer> aIters = sc.accumulator(0);
+               LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
+               LongAccumulator aIters = sc.sc().longAccumulator("iterations");
                
                //run remote_spark parfor job 
                //(w/o lazy evaluation to fit existing parfor framework, e.g., 
result merge)
@@ -77,8 +77,8 @@ public class RemoteParForSpark
                
                //de-serialize results
                LocalVariableMap[] results = RemoteParForUtils.getResults(out, 
LOG);
-               int numTasks = aTasks.value(); //get accumulator value
-               int numIters = aIters.value(); //get accumulator value
+               int numTasks = aTasks.value().intValue(); //get accumulator 
value
+               int numIters = aIters.value().intValue(); //get accumulator 
value
                
                //create output symbol table entries
                RemoteParForJobReturn ret = new RemoteParForJobReturn(true, 
numTasks, numIters, results);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 75e5137..e12376a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -23,9 +23,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
@@ -42,10 +42,10 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        private String  _prog = null;
        private boolean _caching = true;
        
-       private Accumulator<Integer> _aTasks = null;
-       private Accumulator<Integer> _aIters = null;
+       private LongAccumulator _aTasks = null;
+       private LongAccumulator _aIters = null;
        
-       public RemoteParForSparkWorker(String program, boolean cpCaching, 
Accumulator<Integer> atasks, Accumulator<Integer> aiters) 
+       public RemoteParForSparkWorker(String program, boolean cpCaching, 
LongAccumulator atasks, LongAccumulator aiters) 
                throws DMLRuntimeException
        {
                //keep inputs (unfortunately, spark does not expose task ids 
and it would be implementation-dependent

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index daa1ce5..5890bf9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -20,20 +20,19 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.spark.Accumulator;
-import org.apache.spark.AccumulatorParam;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.util.AccumulatorV2;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -124,7 +123,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        Encoder encoderBuild = 
EncoderFactory.createEncoder(spec, colnames,
                                        fo.getSchema(), 
(int)fo.getNumColumns(), null);
                        
-                       Accumulator<Long> accMax = 
sec.getSparkContext().accumulator(0L, new MaxAcc()); 
+                       MaxLongAccumulator accMax = 
registerMaxLongAccumulator(sec.getSparkContext()); 
                        JavaRDD<String> rcMaps = in
                                        .mapPartitionsToPair(new 
TransformEncodeBuildFunction(encoderBuild))
                                        .distinct().groupByKey()
@@ -190,6 +189,54 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                return null;    
        }
        
+       private static MaxLongAccumulator 
registerMaxLongAccumulator(JavaSparkContext sc) {
+               MaxLongAccumulator acc = new MaxLongAccumulator(Long.MIN_VALUE);
+               sc.sc().register(acc, "max");
+               return acc;
+       }
+       
+
+       private static class MaxLongAccumulator extends AccumulatorV2<Long,Long>
+       {
+               private static final long serialVersionUID = 
-3739727823287550826L;
+
+               private long _value = Long.MIN_VALUE;
+               
+               public MaxLongAccumulator(long value) {
+                       _value = value;
+               }
+
+               @Override
+               public void add(Long arg0) {
+                       _value = Math.max(_value, arg0);
+               }
+
+               @Override
+               public AccumulatorV2<Long, Long> copy() {
+                       return new MaxLongAccumulator(_value);
+               }
+
+               @Override
+               public boolean isZero() {
+                       return _value == Long.MIN_VALUE;
+               }
+
+               @Override
+               public void merge(AccumulatorV2<Long, Long> arg0) {
+                       _value = Math.max(_value, arg0.value());
+               }
+
+               @Override
+               public void reset() {
+                       _value = Long.MIN_VALUE;
+               }
+
+               @Override
+               public Long value() {
+                       return _value;
+               }
+       }
+       
        /**
         * This function pre-aggregates distinct values of recoded columns per 
partition
         * (part of distributed recode map construction, used for recoding, 
binning and 
@@ -242,9 +289,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
        {
                private static final long serialVersionUID = 
-1034187226023517119L;
 
-               private Accumulator<Long> _accMax = null;
+               private MaxLongAccumulator _accMax = null;
                
-               public TransformEncodeGroupFunction( Accumulator<Long> accMax ) 
{
+               public TransformEncodeGroupFunction( MaxLongAccumulator accMax 
) {
                        _accMax = accMax;
                }
                
@@ -275,26 +322,6 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                }
        }
 
-       private static class MaxAcc implements AccumulatorParam<Long>, 
Serializable 
-       {
-               private static final long serialVersionUID = 
-3739727823287550826L;
-
-               @Override
-               public Long addInPlace(Long arg0, Long arg1) {
-                       return Math.max(arg0, arg1);
-               }
-
-               @Override
-               public Long zero(Long arg0) {
-                       return arg0;
-               }
-
-               @Override
-               public Long addAccumulator(Long arg0, Long arg1) {
-                       return Math.max(arg0, arg1);    
-               }
-       }
-
        public static class TransformEncodeBuild2Function implements 
PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata>
        {
                private static final long serialVersionUID = 
6336375833412029279L;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 912dbe3..3387770 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -25,9 +25,9 @@ import java.util.Random;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -203,12 +203,12 @@ public class WriteSPInstruction extends SPInstruction
                else if( oi == OutputInfo.CSVOutputInfo ) 
                {
                        JavaRDD<String> out = null;
-                       Accumulator<Double> aNnz = null;
+                       LongAccumulator aNnz = null;
                        
                        if ( isInputMatrixBlock ) {
                                //piggyback nnz computation on actual write
                                if( !mc.nnzKnown() ) {
-                                       aNnz = 
sec.getSparkContext().accumulator(0L);
+                                       aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
                                        in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
                                }       
                                
@@ -252,9 +252,9 @@ public class WriteSPInstruction extends SPInstruction
                }
                else if( oi == OutputInfo.BinaryBlockOutputInfo ) {
                        //piggyback nnz computation on actual write
-                       Accumulator<Double> aNnz = null;
+                       LongAccumulator aNnz = null;
                        if( !mc.nnzKnown() ) {
-                               aNnz = sec.getSparkContext().accumulator(0L);
+                               aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
                                in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
                        }
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
index f76784e..7b6daad 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ComputeBinaryBlockNnzFunction.java
@@ -19,18 +19,17 @@
 
 package org.apache.sysml.runtime.instructions.spark.functions;
 
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.Function;
-
+import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public class ComputeBinaryBlockNnzFunction implements 
Function<MatrixBlock,MatrixBlock> 
 {
        private static final long serialVersionUID = -8396410450821999936L;
        
-       private Accumulator<Double> _aNnz = null;
+       private LongAccumulator _aNnz = null;
        
-       public ComputeBinaryBlockNnzFunction( Accumulator<Double> aNnz )
+       public ComputeBinaryBlockNnzFunction( LongAccumulator aNnz )
        {
                _aNnz = aNnz;
        }
@@ -38,7 +37,7 @@ public class ComputeBinaryBlockNnzFunction implements 
Function<MatrixBlock,Matri
        @Override
        public MatrixBlock call(MatrixBlock arg0) throws Exception 
        {
-               _aNnz.add( (double)arg0.getNonZeros() );
+               _aNnz.add( arg0.getNonZeros() );
                return arg0;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/732e6da4/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index b5a4b58..1310b80 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -27,7 +27,6 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.util.LongAccumulator;
 
 import scala.Tuple2;
 
@@ -166,7 +166,7 @@ public class RDDConverterUtils
        {
                //determine unknown dimensions and sparsity if required
                if( !mc.dimsKnown(true) ) {
-                       Accumulator<Double> aNnz = sc.accumulator(0L);
+                       LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
                        JavaRDD<String> tmp = input.values()
                                        .map(new CSVAnalysisFunction(aNnz, 
delim));
                        long rlen = tmp.count() - (hasHeader ? 1 : 0);
@@ -230,7 +230,7 @@ public class RDDConverterUtils
        {
                //determine unknown dimensions and sparsity if required
                if( !mc.dimsKnown(true) ) {
-                       Accumulator<Double> aNnz = sc.accumulator(0L);
+                       LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
                        JavaRDD<Row> tmp = df.javaRDD().map(new 
DataFrameAnalysisFunction(aNnz, containsID, isVector));
                        long rlen = tmp.count();
                        long clen = !isVector ? df.columns().length - 
(containsID?1:0) : 
@@ -531,10 +531,10 @@ public class RDDConverterUtils
        {
                private static final long serialVersionUID = 
2310303223289674477L;
 
-               private Accumulator<Double> _aNnz = null;
+               private LongAccumulator _aNnz = null;
                private String _delim = null;
                
-               public CSVAnalysisFunction( Accumulator<Double> aNnz, String 
delim )
+               public CSVAnalysisFunction( LongAccumulator aNnz, String delim )
                {
                        _aNnz = aNnz;
                        _delim = delim;
@@ -552,7 +552,7 @@ public class RDDConverterUtils
                        int lnnz = IOUtilFunctions.countNnz(cols);
                        
                        //update counters
-                       _aNnz.add( (double)lnnz );
+                       _aNnz.add( lnnz );
                        
                        return line;
                }
@@ -922,11 +922,11 @@ public class RDDConverterUtils
        {       
                private static final long serialVersionUID = 
5705371332119770215L;
                
-               private Accumulator<Double> _aNnz = null;
+               private LongAccumulator _aNnz = null;
                private boolean _containsID;
                private boolean _isVector;
                
-               public DataFrameAnalysisFunction( Accumulator<Double> aNnz, 
boolean containsID, boolean isVector) {
+               public DataFrameAnalysisFunction( LongAccumulator aNnz, boolean 
containsID, boolean isVector) {
                        _aNnz = aNnz;
                        _containsID = containsID;
                        _isVector = isVector;
@@ -940,7 +940,7 @@ public class RDDConverterUtils
                        int lnnz = countNnz(vect, _isVector, off);
                        
                        //update counters
-                       _aNnz.add( (double)lnnz );
+                       _aNnz.add( lnnz );
                        return arg0;
                }
        }

Reply via email to