Repository: incubator-systemml
Updated Branches:
  refs/heads/master b6a46500d -> b78c12593


[SYSTEMML-1350] Performance parfor spark datapartition-execute jobs

This patch makes the following performance and robustness improvements
to the parfor spark datapartition-execute and datapartition jobs:

(1) Data-size-dependent number of reduce tasks. So far, we used at max
the number of cores to achieve best pre-aggregation of results. However,
on spark, a too small number of reduce tasks (and hence too large reduce
partitions) can lead to 2GB limit issues and OOMs due to increase memory
pressure. We now determine the number of reduce tasks in a more
considerate way.

(2) Reuse of matrix block partitions. For dense matrix block partitions,
we now reuse already allocated partitions in order to reduce GC
overheads.

(3) Incremental nnz maintenance. Finally, we now also incrementally
maintain the nnz during partition collect based on the block meta data
instead of recomputing it, which avoids an unnecessary scan per
partition.  

Together, these changes improved the runtime of perftest 80GB
univariate/bivariate from 379s/466s to 337s/376s on a small 1+5 node
cluster, while ensuring also much better robustness for larger datasets. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/baa70a15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/baa70a15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/baa70a15

Branch: refs/heads/master
Commit: baa70a150b7e01f81f72ac1ce0ca07fab9a18265
Parents: b6a4650
Author: Matthias Boehm <[email protected]>
Authored: Fri Feb 24 15:02:53 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Feb 25 11:51:02 2017 -0800

----------------------------------------------------------------------
 .../parfor/DataPartitionerRemoteSpark.java      |  36 ++++---
 .../DataPartitionerRemoteSparkReducer.java      |  20 ++--
 .../parfor/RemoteDPParForSpark.java             |  28 +++--
 .../parfor/RemoteDPParForSparkWorker.java       | 107 ++++++++-----------
 4 files changed, 89 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
index 41fb235..be758d2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
@@ -27,6 +27,7 @@ import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -66,26 +67,24 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
 
                try
                {
-                   //cleanup existing output files
-                   MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-       
-                   //determine degree of parallelism
-                       int numRed = (int)determineNumReducers(rlen, clen, 
brlen, bclen, _numRed);
-       
+                       //cleanup existing output files
+                       MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
                        //get input rdd
                        JavaPairRDD<MatrixIndexes, MatrixBlock> inRdd = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) 
                                        sec.getRDDHandleForMatrixObject(in, 
InputInfo.BinaryBlockInputInfo);
-                       MatrixCharacteristics mc = 
in.getMatrixCharacteristics();
                        
+                       //determine degree of parallelism
+                       MatrixCharacteristics mc = 
in.getMatrixCharacteristics();
+                       int numRed = (int)determineNumReducers(inRdd, mc, 
_numRed);
+       
                        //run spark remote data partition job 
                        DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, _format);
                        DataPartitionerRemoteSparkReducer wfun = new 
DataPartitionerRemoteSparkReducer(fnameNew, oi);
                        inRdd.flatMapToPair(dpfun) //partition the input blocks
                             .groupByKey(numRed)   //group partition blocks     
                  
-                            .foreach( wfun );     //write partitions to hdfs 
+                            .foreach(wfun);       //write partitions to hdfs 
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
                
@@ -97,12 +96,17 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
                }
        }
 
-       private long determineNumReducers(long rlen, long clen, int brlen, int 
bclen, long numRed)
+       private long 
determineNumReducers(JavaPairRDD<MatrixIndexes,MatrixBlock> in,
+               MatrixCharacteristics mc, long numRed)
        {
-               //set the number of mappers and reducers 
+               long rlen = mc.getRows();
+               long clen = mc.getCols();
+               int brlen = mc.getRowsPerBlock();
+               int bclen = mc.getColsPerBlock();
+               
+               //determine number of reducer groups 
            long reducerGroups = -1;
-           switch( _format )
-           {
+           switch( _format ) {
                    case ROW_WISE: reducerGroups = rlen; break;
                    case COLUMN_WISE: reducerGroups = clen; break;
                    case ROW_BLOCK_WISE: reducerGroups = 
(rlen/brlen)+((rlen%brlen==0)?0:1); break;
@@ -113,6 +117,8 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
                                //do nothing
            }
            
-           return (int)Math.min( numRed, reducerGroups);       
+         //compute number of reducers (to avoid OOMs and reduce memory 
pressure)
+         int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
+         return Math.max(numRed, Math.min(numParts, reducerGroups));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index 8caac98..d8bb04d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.spark.api.java.function.VoidFunction;
 
 import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -40,16 +40,12 @@ import scala.Tuple2;
 
 public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Long, Iterable<Writable>>> 
 {
-       
        private static final long serialVersionUID = -7149865018683261964L;
        
        private String _fnameNew = null;
        
-       public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo 
oi) 
-               throws DMLRuntimeException
-       {
+       public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo 
oi) {
                _fnameNew = fnameNew;
-               //_oi = oi;
        }
 
        @Override
@@ -69,17 +65,13 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
                        FileSystem fs = FileSystem.get(job);
                        Path path = new Path(_fnameNew + File.separator + key);
                        writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class);
-                       while( valueList.hasNext() )
-                       {
+                       while( valueList.hasNext() ) {
                                PairWritableBlock pair = (PairWritableBlock) 
valueList.next();
                                writer.append(pair.indexes, pair.block);
                        }
                } 
-               finally
-               {
-                       if( writer != null )
-                               writer.close();
+               finally {
+                       IOUtilFunctions.closeSilently(writer);
                }       
-       }
-       
+       }       
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 8663038..0c4b570 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -36,6 +36,7 @@ import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixDimensionsMetaData;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
@@ -55,10 +56,9 @@ public class RemoteDPParForSpark
        
        protected static final Log LOG = 
LogFactory.getLog(RemoteDPParForSpark.class.getName());
 
-       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, String resultFile, MatrixObject input, 
-                                                          ExecutionContext ec,
-                                                          PDataPartitionFormat 
dpf, OutputInfo oi, boolean tSparseCol, //config params
-                                                          boolean 
enableCPCaching, int numReducers )  //opt params
+       public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, String resultFile, 
+                       MatrixObject input, ExecutionContext ec, 
PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params
+                       boolean enableCPCaching, int numReducers )  //opt params
                throws DMLRuntimeException
        {
                String jobname = "ParFor-DPESP";
@@ -71,20 +71,26 @@ public class RemoteDPParForSpark
                MatrixDimensionsMetaData md = (MatrixDimensionsMetaData) 
input.getMetaData();
                MatrixCharacteristics mc = md.getMatrixCharacteristics();
                InputInfo ii = InputInfo.BinaryBlockInputInfo;
-                               
-               //initialize accumulators for tasks/iterations
+
+               //initialize accumulators for tasks/iterations, and inputs
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(matrixvar);
                LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
                LongAccumulator aIters = sc.sc().longAccumulator("iterations");
+
+               //compute number of reducers (to avoid OOMs and reduce memory 
pressure)
+               int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
+               int numParts2 = 
(int)((dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : 
mc.getCols()); 
+               int numReducers2 = Math.max(numReducers, Math.min(numParts, 
numParts2));
                
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(matrixvar);
+               //core parfor datapartition-execute
                DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
                RemoteDPParForSparkWorker efun = new 
RemoteDPParForSparkWorker(program, matrixvar, itervar, 
                                          enableCPCaching, mc, tSparseCol, dpf, 
oi, aTasks, aIters);
                List<Tuple2<Long,String>> out = 
-                               in.flatMapToPair(dpfun)         //partition the 
input blocks
-                         .groupByKey(numReducers)      //group partition 
blocks                          
-                         .mapPartitionsToPair( efun )  //execute parfor tasks, 
incl cleanup
-                         .collect();                   //get output handles
+                               in.flatMapToPair(dpfun)       //partition the 
input blocks
+                         .groupByKey(numReducers2)   //group partition blocks  
                  
+                         .mapPartitionsToPair(efun)  //execute parfor tasks, 
incl cleanup
+                         .collect();                 //get output handles
                
                //de-serialize results
                LocalVariableMap[] results = RemoteParForUtils.getResults(out, 
LOG);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index e12d010..ad0fbf8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -47,27 +47,25 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
 {
        private static final long serialVersionUID = 30223759283155139L;
        
-       private String  _prog = null;
-       private boolean _caching = true;
-       private String _inputVar = null;
-       private String _iterVar = null;
+       private final String  _prog;
+       private final boolean _caching;
+       private final String _inputVar;
+       private final String _iterVar;
        
-       private OutputInfo _oinfo = null;
-       private int _rlen = -1;
-       private int _clen = -1;
-       private int _brlen = -1;
-       private int _bclen = -1;
-       private boolean _tSparseCol = false;
-       private PDataPartitionFormat _dpf = null;
+       private final OutputInfo _oinfo;
+       private final int _rlen;
+       private final int _clen;
+       private final int _brlen;
+       private final int _bclen;
+       private final boolean _tSparseCol;
+       private final PDataPartitionFormat _dpf;
        
-       private LongAccumulator _aTasks = null;
-       private LongAccumulator _aIters = null;
+       private final LongAccumulator _aTasks;
+       private final LongAccumulator _aIters;
        
        public RemoteDPParForSparkWorker(String program, String inputVar, 
String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean 
tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, 
LongAccumulator aiters) 
                throws DMLRuntimeException
        {
-               //keep inputs (unfortunately, spark does not expose task ids 
and it would be implementation-dependent
-               //when this constructor is actually called; hence, we do lazy 
initialization on task execution)
                _prog = program;
                _caching = cpCaching;
                _inputVar = inputVar;
@@ -78,18 +76,13 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _aTasks = atasks;
                _aIters = aiters;
                
-               //setup matrixblock partition and meta data
-               _rlen = (int)mc.getRows();
-               _clen = (int)mc.getCols();
+               //setup matrix block partition meta data
+               _rlen = (dpf != PDataPartitionFormat.ROW_WISE) ? 
(int)mc.getRows() : 1;
+               _clen = (dpf != PDataPartitionFormat.COLUMN_WISE) ? 
(int)mc.getCols() : 1;
                _brlen = mc.getRowsPerBlock();
                _bclen = mc.getColsPerBlock();
                _tSparseCol = tSparseCol;
                _dpf = dpf;
-               switch( _dpf ) { //create matrix partition for reuse
-                       case ROW_WISE:    _rlen = 1; break;
-                       case COLUMN_WISE: _clen = 1; break;
-                       default:  throw new RuntimeException("Partition format 
not yet supported in fused partition-execute: "+dpf);
-               }
        }
        
        @Override 
@@ -102,14 +95,14 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                configureWorker( TaskContext.get().taskAttemptId() ); 
//requires Spark 1.3
        
                //process all matrix partitions of this data partition
+               MatrixBlock partition = null;
                while( arg0.hasNext() )
                {
                        Tuple2<Long,Iterable<Writable>> larg = arg0.next();
                        
                        //collect input partition (check via equals because 
oinfo deserialized instance)
-                       MatrixBlock partition = null;
                        if( _oinfo.equals(OutputInfo.BinaryBlockOutputInfo) )
-                               partition = collectBinaryBlock( larg._2() );
+                               partition = collectBinaryBlock( larg._2(), 
partition );
                        else
                                partition = collectBinaryCellInput( larg._2() );
                        
@@ -178,42 +171,44 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
         * will overwrite the result.
         * 
         * @param valueList iterable writables
+        * @param reuse matrix block partition for reuse
         * @return matrix block
         * @throws IOException if IOException occurs
         */
-       private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList ) 
+       private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList, 
MatrixBlock reuse ) 
                throws IOException 
        {
-               MatrixBlock partition = null;
+               MatrixBlock partition = reuse;
                
                try
                {
                        //reset reuse block, keep configured representation
                        if( _tSparseCol )
                                partition = new MatrixBlock(_clen, _rlen, true);
+                       else if( partition!=null )
+                               partition.reset(_rlen, _clen, false);
                        else
                                partition = new MatrixBlock(_rlen, _clen, 
false);
 
-                       for( Writable val : valueList )
-                       {
-                               PairWritableBlock pairValue = 
(PairWritableBlock) val;
-                               int row_offset = 
(int)(pairValue.indexes.getRowIndex()-1)*_brlen;
-                               int col_offset = 
(int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
-                               MatrixBlock block = pairValue.block;
+                       long lnnz = 0;
+                       for( Writable val : valueList ) {
+                               PairWritableBlock pval = (PairWritableBlock) 
val;
+                               int row_offset = 
(int)(pval.indexes.getRowIndex()-1)*_brlen;
+                               int col_offset = 
(int)(pval.indexes.getColumnIndex()-1)*_bclen;
                                if( !partition.isInSparseFormat() ) //DENSE
-                               {
-                                       partition.copy( row_offset, 
row_offset+block.getNumRows()-1, 
-                                                          col_offset, 
col_offset+block.getNumColumns()-1,
-                                                          pairValue.block, 
false ); 
-                               }
+                                       partition.copy( row_offset, 
row_offset+pval.block.getNumRows()-1, 
+                                                          col_offset, 
col_offset+pval.block.getNumColumns()-1,
+                                                          pval.block, false ); 
                                else //SPARSE 
-                               {
-                                       
partition.appendToSparse(pairValue.block, row_offset, col_offset);
-                               }
+                                       partition.appendToSparse(pval.block, 
row_offset, col_offset);
+                               lnnz += pval.block.getNonZeros();
                        }
 
-                       //final partition cleanup
-                       cleanupCollectedMatrixPartition( partition, 
partition.isInSparseFormat() );
+                       //post-processing: cleanups if required
+                       if( partition.isInSparseFormat() && _clen>_bclen )
+                               partition.sortSparseRows();
+                       partition.setNonZeros(lnnz);
+                       partition.examSparsity();
                }
                catch(DMLRuntimeException ex)
                {
@@ -273,29 +268,17 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                                throw new IOException("Partition format not yet 
supported in fused partition-execute: "+_dpf);
                }
                
-               //final partition cleanup
-               cleanupCollectedMatrixPartition(partition, _tSparseCol);
-               
-               return partition;
-       }
-
-       private void cleanupCollectedMatrixPartition(MatrixBlock partition, 
boolean sort) 
-               throws IOException
-       {
-               //sort sparse row contents if required
-               if( partition.isInSparseFormat() && sort )
-                       partition.sortSparseRows();
-
-               //ensure right number of nnz
-               if( !partition.isInSparseFormat() )
-                       partition.recomputeNonZeros();
-                       
-               //exam and switch dense/sparse representation
+               //post-processing: cleanups if required
                try {
+                       if( partition.isInSparseFormat() && _tSparseCol )
+                               partition.sortSparseRows();
+                       partition.recomputeNonZeros();
                        partition.examSparsity();
                }
-               catch(Exception ex){
+               catch(DMLRuntimeException ex) {
                        throw new IOException(ex);
                }
+                       
+               return partition;
        }
 }

Reply via email to