Repository: systemml Updated Branches: refs/heads/master cd359c2e9 -> 45a93396a
[MINOR] Performance and cleanup spark data partitioning Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/18d98b61 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/18d98b61 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/18d98b61 Branch: refs/heads/master Commit: 18d98b61e55e71a2da51a5f0a855c98beacbcdd7 Parents: cd359c2 Author: Matthias Boehm <[email protected]> Authored: Mon Apr 30 14:02:14 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Apr 30 19:47:41 2018 -0700 ---------------------------------------------------------------------- .../runtime/controlprogram/ParForProgramBlock.java | 4 +++- .../parfor/DataPartitionerRemoteSpark.java | 4 ++-- .../parfor/DataPartitionerRemoteSparkReducer.java | 17 ++++++++--------- 3 files changed, 13 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/18d98b61/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 3d17e2f..ecbbb6f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -1441,7 +1441,9 @@ public class ParForProgramBlock extends ForProgramBlock DataPartitioner dp = null; //determine max degree of parallelism - int numReducers = ConfigurationManager.getNumReducers(); + int numReducers = OptimizerUtils.isSparkExecutionMode() ? + SparkExecutionContext.getDefaultParallelism(false) : + ConfigurationManager.getNumReducers(); int maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks(); //correction max number of reducers on yarn clusters if( InfrastructureAnalyzer.isYarnEnabled() ) http://git-wip-us.apache.org/repos/asf/systemml/blob/18d98b61/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java index b09420b..9f00dc6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java @@ -80,8 +80,8 @@ public class DataPartitionerRemoteSpark extends DataPartitioner DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n); DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi, _replication); inRdd.flatMapToPair(dpfun) //partition the input blocks - .groupByKey(numRed) //group partition blocks - .foreach(wfun); //write partitions to hdfs + .groupByKey(numRed) //group partition blocks + .foreach(wfun); //write partitions to hdfs } catch(Exception ex) { throw new DMLRuntimeException(ex); http://git-wip-us.apache.org/repos/asf/systemml/blob/18d98b61/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java index ad228eb..edc8d78 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java @@ -52,7 +52,7 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo } @Override - @SuppressWarnings("deprecation") + @SuppressWarnings("deprecation") public void call(Tuple2<Long, Iterable<Writable>> arg0) throws Exception { @@ -62,24 +62,23 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo //write entire partition to binary block sequence file SequenceFile.Writer writer = null; - try - { + try { //create sequence file writer Configuration job = new Configuration(ConfigurationManager.getCachedJobConf()); + job.setInt(MRConfigurationNames.DFS_REPLICATION, _replication); + Path path = new Path(_fnameNew + File.separator + key); FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, - job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), - (short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); + writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); //write individual blocks unordered to output while( valueList.hasNext() ) { PairWritableBlock pair = (PairWritableBlock) valueList.next(); writer.append(pair.indexes, pair.block); } - } + } finally { IOUtilFunctions.closeSilently(writer); - } - } + } + } }
