[SYSTEMML-1429] Performance parfor spark dp (handle replication factor)

So far, we simply used the default replication factor in the parfor
spark data partitioning job, which writes individual sequence files per
partition after grouping. However, the parfor optimizer already selects
an script/cluster-specific replication factor (e.g, high replication
factor if partitions are accessed multiple times in loops). This patch
improves the parfor spark data partitioner to write partitions in the
"correct" replication factor. 

On a 80GB scenario with block partitioning, batch size 200, and remote
data partitioning and execute (without fusion), performance improved
from 512s (w/ replication factor 3) to 293s (w/ replication factor 1). 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/75e7ad6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/75e7ad6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/75e7ad6e

Branch: refs/heads/master
Commit: 75e7ad6ec849b73c64b6a72791ed75898b1e8c33
Parents: 1aac97e
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Tue Mar 21 20:30:31 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Tue Mar 21 20:30:31 2017 -0700

----------------------------------------------------------------------
 .../runtime/controlprogram/ParForProgramBlock.java  |  7 ++++---
 .../parfor/DataPartitionerRemoteMR.java             |  4 ++--
 .../parfor/DataPartitionerRemoteSpark.java          | 12 ++++++------
 .../parfor/DataPartitionerRemoteSparkReducer.java   | 16 ++++++++++++----
 .../controlprogram/parfor/RemoteDPParForMR.java     |  2 +-
 .../parfor/opt/OptimizerRuleBased.java              |  4 ++--
 .../sysml/runtime/matrix/DataPartitionMR.java       |  2 +-
 7 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index eec84b6..d6186f3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -975,7 +975,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                              (inputMatrix.getSparsity()<0.001 
&& inputDPF==PartitionFormat.ROW_WISE))? 
                                             OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
                RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, resultFile, 
-                               inputMatrix, inputDPF, inputOI, _tSparseCol, 
_enableCPCaching, _numThreads, _replicationDP, MAX_RETRYS_ON_ERROR );
+                               inputMatrix, inputDPF, inputOI, _tSparseCol, 
_enableCPCaching, _numThreads, _replicationDP );
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
@@ -1499,10 +1499,11 @@ public class ParForProgramBlock extends ForProgramBlock
                                break;
                        case REMOTE_MR:
                                dp = new DataPartitionerRemoteMR( dpf, _ID, 
numRed,
-                                       _replicationDP, MAX_RETRYS_ON_ERROR, 
ALLOW_REUSE_MR_JVMS, false );
+                                               _replicationDP, 
ALLOW_REUSE_MR_JVMS, false );
                                break;
                        case REMOTE_SPARK:
-                               dp = new DataPartitionerRemoteSpark( dpf, ec, 
numRed, false );
+                               dp = new DataPartitionerRemoteSpark( dpf, ec, 
numRed,
+                                               _replicationDP, false );
                                break;  
                        default:
                                throw new DMLRuntimeException("Unknown data 
partitioner: '" +dataPartitioner.name()+"'.");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
index 5514007..b784ae3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
@@ -55,12 +55,12 @@ public class DataPartitionerRemoteMR extends DataPartitioner
        private boolean _keepIndexes = false;
        
        
-       public DataPartitionerRemoteMR(PartitionFormat dpf, long pfid, int 
numReducers, int replication, int max_retry, boolean jvmReuse, boolean 
keepIndexes) 
+       public DataPartitionerRemoteMR(PartitionFormat dpf, long pfid, int 
numRed, int replication, boolean jvmReuse, boolean keepIndexes) 
        {
                super(dpf._dpf, dpf._N);
                
                _pfid = pfid;
-               _numReducers = numReducers;
+               _numReducers = numRed;
                _replication = replication;
                _jvmReuse = jvmReuse;
                _keepIndexes = keepIndexes;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
index 1569709..f7b09b1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
@@ -42,17 +42,17 @@ import org.apache.sysml.utils.Statistics;
  */
 public class DataPartitionerRemoteSpark extends DataPartitioner
 {      
+       private final ExecutionContext _ec;
+       private final long _numRed;
+       private final int _replication;
        
-       //private boolean _keepIndexes = false;
-       private ExecutionContext _ec = null;
-       private long _numRed = -1;
-       
-       public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext 
ec, long numRed, boolean keepIndexes) 
+       public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext 
ec, long numRed, int replication, boolean keepIndexes) 
        {
                super(dpf._dpf, dpf._N);
                
                _ec = ec;
                _numRed = numRed;
+               _replication = replication;
        }
 
        @Override
@@ -79,7 +79,7 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
        
                        //run spark remote data partition job 
                        DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n);
-                       DataPartitionerRemoteSparkReducer wfun = new 
DataPartitionerRemoteSparkReducer(fnameNew, oi);
+                       DataPartitionerRemoteSparkReducer wfun = new 
DataPartitionerRemoteSparkReducer(fnameNew, oi, _replication);
                        inRdd.flatMapToPair(dpfun) //partition the input blocks
                             .groupByKey(numRed)   //group partition blocks     
                  
                             .foreach(wfun);       //write partitions to hdfs 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index d8bb04d..174c544 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -35,6 +35,7 @@ import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 
 import scala.Tuple2;
 
@@ -42,10 +43,12 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
 {
        private static final long serialVersionUID = -7149865018683261964L;
        
-       private String _fnameNew = null;
+       private final String _fnameNew;
+       private final int _replication;
        
-       public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo 
oi) {
+       public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo 
oi, int replication) {
                _fnameNew = fnameNew;
+               _replication = replication;
        }
 
        @Override
@@ -60,11 +63,16 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
                //write entire partition to binary block sequence file
                SequenceFile.Writer writer = null;
                try
-               {                       
+               {
+                       //create sequence file writer
                        Configuration job = new 
Configuration(ConfigurationManager.getCachedJobConf());
                        FileSystem fs = FileSystem.get(job);
                        Path path = new Path(_fnameNew + File.separator + key);
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class);
+                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class, 
+                                       
job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
+                    (short)_replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata()); 
+                       
+                       //write individual blocks unordered to output
                        while( valueList.hasNext() ) {
                                PairWritableBlock pair = (PairWritableBlock) 
valueList.next();
                                writer.append(pair.indexes, pair.block);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index d01368e..a8fda17 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -72,7 +72,7 @@ public class RemoteDPParForMR
 
        public static RemoteParForJobReturn runJob(long pfid, String itervar, 
String matrixvar, String program, 
                        String resultFile, MatrixObject input, PartitionFormat 
dpf, OutputInfo oi, boolean tSparseCol, //config params
-                       boolean enableCPCaching, int numReducers, int 
replication, int max_retry)  //opt params
+                       boolean enableCPCaching, int numReducers, int 
replication)  //opt params
                throws DMLRuntimeException
        {
                RemoteParForJobReturn ret = null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 4003975..f0eb0e7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1085,8 +1085,8 @@ public class OptimizerRuleBased extends Optimizer
                ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
                                                                
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
                
-               if(    n.getExecType()==ExecType.MR
-                       && 
n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_MR.toString())
+               if(((n.getExecType()==ExecType.MR && 
n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_MR.name()))
+                   || (n.getExecType()==ExecType.SPARK && 
n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_SPARK.name())))
                    && n.hasNestedParallelism(false) 
                    && n.hasNestedPartitionReads(false) )               
                {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
index 3675194..762ecde 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java
@@ -83,7 +83,7 @@ public class DataPartitionMR
                                                throw new 
DMLRuntimeException("Unsupported partition format for distributed cache input: 
"+pformat);
                                }
                                PartitionFormat pf = new 
PartitionFormat(pformat, (int)N);
-                               DataPartitioner dpart = new 
DataPartitionerRemoteMR(pf, -1, numReducers, replication, 4, false, true);
+                               DataPartitioner dpart = new 
DataPartitionerRemoteMR(pf, -1, numReducers, replication, false, true);
                                out = dpart.createPartitionedMatrixObject(in, 
out, true);
                                
                                sts[i] = out.getMatrixCharacteristics();

Reply via email to