Repository: systemml
Updated Branches:
  refs/heads/master cd359c2e9 -> 45a93396a


[MINOR] Performance and cleanup spark data partitioning 

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/18d98b61
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/18d98b61
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/18d98b61

Branch: refs/heads/master
Commit: 18d98b61e55e71a2da51a5f0a855c98beacbcdd7
Parents: cd359c2
Author: Matthias Boehm <[email protected]>
Authored: Mon Apr 30 14:02:14 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Apr 30 19:47:41 2018 -0700

----------------------------------------------------------------------
 .../runtime/controlprogram/ParForProgramBlock.java |  4 +++-
 .../parfor/DataPartitionerRemoteSpark.java         |  4 ++--
 .../parfor/DataPartitionerRemoteSparkReducer.java  | 17 ++++++++---------
 3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/18d98b61/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 3d17e2f..ecbbb6f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1441,7 +1441,9 @@ public class ParForProgramBlock extends ForProgramBlock
                DataPartitioner dp = null;
                
                //determine max degree of parallelism
-               int numReducers = ConfigurationManager.getNumReducers();
+               int numReducers = OptimizerUtils.isSparkExecutionMode() ?
+                       SparkExecutionContext.getDefaultParallelism(false) :
+                       ConfigurationManager.getNumReducers();
                int maxNumRed = 
InfrastructureAnalyzer.getRemoteParallelReduceTasks();
                //correction max number of reducers on yarn clusters
                if( InfrastructureAnalyzer.isYarnEnabled() )

http://git-wip-us.apache.org/repos/asf/systemml/blob/18d98b61/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
index b09420b..9f00dc6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
@@ -80,8 +80,8 @@ public class DataPartitionerRemoteSpark extends 
DataPartitioner
                        DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, _format, _n);
                        DataPartitionerRemoteSparkReducer wfun = new 
DataPartitionerRemoteSparkReducer(fnameNew, oi, _replication);
                        inRdd.flatMapToPair(dpfun) //partition the input blocks
-                            .groupByKey(numRed)   //group partition blocks     
                  
-                            .foreach(wfun);       //write partitions to hdfs 
+                               .groupByKey(numRed)    //group partition blocks
+                               .foreach(wfun);        //write partitions to 
hdfs
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/systemml/blob/18d98b61/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index ad228eb..edc8d78 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -52,7 +52,7 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
        }
 
        @Override
-       @SuppressWarnings("deprecation")        
+       @SuppressWarnings("deprecation")
        public void call(Tuple2<Long, Iterable<Writable>> arg0)
                throws Exception 
        {
@@ -62,24 +62,23 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
                
                //write entire partition to binary block sequence file
                SequenceFile.Writer writer = null;
-               try
-               {
+               try {
                        //create sequence file writer
                        Configuration job = new 
Configuration(ConfigurationManager.getCachedJobConf());
+                       job.setInt(MRConfigurationNames.DFS_REPLICATION, 
_replication);
+                       
                        Path path = new Path(_fnameNew + File.separator + key);
                        FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class, 
-                                       
job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
-                    (short)_replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata()); 
+                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class);
                        
                        //write individual blocks unordered to output
                        while( valueList.hasNext() ) {
                                PairWritableBlock pair = (PairWritableBlock) 
valueList.next();
                                writer.append(pair.indexes, pair.block);
                        }
-               } 
+               }
                finally {
                        IOUtilFunctions.closeSilently(writer);
-               }       
-       }       
+               }
+       }
 }

Reply via email to