[SYSTEMML-1429] Performance parfor spark dp (handle replication factor) So far, we simply used the default replication factor in the parfor spark data partitioning job, which writes individual sequence files per partition after grouping. However, the parfor optimizer already selects an script/cluster-specific replication factor (e.g, high replication factor if partitions are accessed multiple times in loops). This patch improves the parfor spark data partitioner to write partitions in the "correct" replication factor.
On a 80GB scenario with block partitioning, batch size 200, and remote data partitioning and execute (without fusion), performance improved from 512s (w/ replication factor 3) to 293s (w/ replication factor 1). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/75e7ad6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/75e7ad6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/75e7ad6e Branch: refs/heads/master Commit: 75e7ad6ec849b73c64b6a72791ed75898b1e8c33 Parents: 1aac97e Author: Matthias Boehm <mboe...@gmail.com> Authored: Tue Mar 21 20:30:31 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Tue Mar 21 20:30:31 2017 -0700 ---------------------------------------------------------------------- .../runtime/controlprogram/ParForProgramBlock.java | 7 ++++--- .../parfor/DataPartitionerRemoteMR.java | 4 ++-- .../parfor/DataPartitionerRemoteSpark.java | 12 ++++++------ .../parfor/DataPartitionerRemoteSparkReducer.java | 16 ++++++++++++---- .../controlprogram/parfor/RemoteDPParForMR.java | 2 +- .../parfor/opt/OptimizerRuleBased.java | 4 ++-- .../sysml/runtime/matrix/DataPartitionMR.java | 2 +- 7 files changed, 28 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index eec84b6..d6186f3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -975,7 +975,7 @@ public class ParForProgramBlock extends ForProgramBlock (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE))? OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo; RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, - inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP, MAX_RETRYS_ON_ERROR ); + inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP ); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); @@ -1499,10 +1499,11 @@ public class ParForProgramBlock extends ForProgramBlock break; case REMOTE_MR: dp = new DataPartitionerRemoteMR( dpf, _ID, numRed, - _replicationDP, MAX_RETRYS_ON_ERROR, ALLOW_REUSE_MR_JVMS, false ); + _replicationDP, ALLOW_REUSE_MR_JVMS, false ); break; case REMOTE_SPARK: - dp = new DataPartitionerRemoteSpark( dpf, ec, numRed, false ); + dp = new DataPartitionerRemoteSpark( dpf, ec, numRed, + _replicationDP, false ); break; default: throw new DMLRuntimeException("Unknown data partitioner: '" +dataPartitioner.name()+"'."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java index 5514007..b784ae3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java @@ -55,12 +55,12 @@ public class DataPartitionerRemoteMR extends DataPartitioner private boolean _keepIndexes = false; - public DataPartitionerRemoteMR(PartitionFormat dpf, long pfid, int numReducers, int replication, int max_retry, boolean jvmReuse, boolean keepIndexes) + public DataPartitionerRemoteMR(PartitionFormat dpf, long pfid, int numRed, int replication, boolean jvmReuse, boolean keepIndexes) { super(dpf._dpf, dpf._N); _pfid = pfid; - _numReducers = numReducers; + _numReducers = numRed; _replication = replication; _jvmReuse = jvmReuse; _keepIndexes = keepIndexes; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java index 1569709..f7b09b1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java @@ -42,17 +42,17 @@ import org.apache.sysml.utils.Statistics; */ public class DataPartitionerRemoteSpark extends DataPartitioner { + private final ExecutionContext _ec; + private final long _numRed; + private final int _replication; - //private boolean _keepIndexes = false; - private ExecutionContext _ec = null; - private long _numRed = -1; - - public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext ec, long numRed, boolean keepIndexes) + public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext ec, long numRed, int replication, boolean keepIndexes) { super(dpf._dpf, dpf._N); _ec = ec; _numRed = numRed; + _replication = replication; } @Override @@ -79,7 +79,7 @@ public class DataPartitionerRemoteSpark extends DataPartitioner //run spark remote data partition job DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n); - DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi); + DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi, _replication); inRdd.flatMapToPair(dpfun) //partition the input blocks .groupByKey(numRed) //group partition blocks .foreach(wfun); //write partitions to hdfs http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java index d8bb04d..174c544 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java @@ -35,6 +35,7 @@ import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import scala.Tuple2; @@ -42,10 +43,12 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo { private static final long serialVersionUID = -7149865018683261964L; - private String _fnameNew = null; + private final String _fnameNew; + private final int _replication; - public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo oi) { + public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo oi, int replication) { _fnameNew = fnameNew; + _replication = replication; } @Override @@ -60,11 +63,16 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo //write entire partition to binary block sequence file SequenceFile.Writer writer = null; try - { + { + //create sequence file writer Configuration job = new Configuration(ConfigurationManager.getCachedJobConf()); FileSystem fs = FileSystem.get(job); Path path = new Path(_fnameNew + File.separator + key); - writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); + writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, + job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), + (short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); + + //write individual blocks unordered to output while( valueList.hasNext() ) { PairWritableBlock pair = (PairWritableBlock) valueList.next(); writer.append(pair.indexes, pair.block); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index d01368e..a8fda17 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -72,7 +72,7 @@ public class RemoteDPParForMR public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, PartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params - boolean enableCPCaching, int numReducers, int replication, int max_retry) //opt params + boolean enableCPCaching, int numReducers, int replication) //opt params throws DMLRuntimeException { RemoteParForJobReturn ret = null; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 4003975..f0eb0e7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -1085,8 +1085,8 @@ public class OptimizerRuleBased extends Optimizer ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter .getAbstractPlanMapping().getMappedProg(n.getID())[1]; - if( n.getExecType()==ExecType.MR - && n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_MR.toString()) + if(((n.getExecType()==ExecType.MR && n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_MR.name())) + || (n.getExecType()==ExecType.SPARK && n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_SPARK.name()))) && n.hasNestedParallelism(false) && n.hasNestedPartitionReads(false) ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/75e7ad6e/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java index 3675194..762ecde 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/DataPartitionMR.java @@ -83,7 +83,7 @@ public class DataPartitionMR throw new DMLRuntimeException("Unsupported partition format for distributed cache input: "+pformat); } PartitionFormat pf = new PartitionFormat(pformat, (int)N); - DataPartitioner dpart = new DataPartitionerRemoteMR(pf, -1, numReducers, replication, 4, false, true); + DataPartitioner dpart = new DataPartitionerRemoteMR(pf, -1, numReducers, replication, false, true); out = dpart.createPartitionedMatrixObject(in, out, true); sts[i] = out.getMatrixCharacteristics();