[MINOR] Avoid unnecessary stats maintenance in spark remote parfor

This patch makes a minor performance improvement to spark remote parfor
jobs. In contrast to the MR backend, we don't propagate individual
statistics back to the driver via counters. Therefore, we now disable
statistics maintenance in remote parfor workers in case of spark which
reduces overhead for parfor loops with lots of fine-grained operations
(where stats maintenance can become a bottleneck).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d34d6a62
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d34d6a62
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d34d6a62

Branch: refs/heads/master
Commit: d34d6a6299b56766f3a92a6cfee258fe8272545a
Parents: e8774a7
Author: Matthias Boehm <[email protected]>
Authored: Sun Apr 22 16:32:49 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sun Apr 22 16:32:49 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/parfor/ProgramConverter.java     |  8 ++++++--
 .../parfor/RemoteDPParForSparkWorker.java           | 16 +++++-----------
 .../parfor/RemoteParForSparkWorker.java             | 11 +++--------
 3 files changed, 14 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/d34d6a62/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 1356634..919b357 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -1254,8 +1254,11 @@ public class ProgramConverter
        ////////////////////////////////
        // PARSING 
        ////////////////////////////////
-
        public static ParForBody parseParForBody( String in, int id ) {
+               return parseParForBody(in, id, false);
+       }
+       
+       public static ParForBody parseParForBody( String in, int id, boolean 
inSpark ) {
                ParForBody body = new ParForBody();
                
                //header elimination
@@ -1284,7 +1287,8 @@ public class ProgramConverter
                
                //handle additional configs
                String aconfs = st.nextToken();
-               parseAndSetAdditionalConfigurations( aconfs );
+               if( !inSpark )
+                       parseAndSetAdditionalConfigurations( aconfs );
                
                //handle program
                String progStr = st.nextToken();

http://git-wip-us.apache.org/repos/asf/systemml/blob/d34d6a62/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 4ab90ae..885b2b7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -98,8 +97,6 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        public Iterator<Tuple2<Long, String>> call(Iterator<Tuple2<Long, 
Iterable<Writable>>> arg0)
                throws Exception 
        {
-               ArrayList<Tuple2<Long,String>> ret = new ArrayList<>();
-               
                //lazy parworker initialization
                configureWorker( TaskContext.get().taskAttemptId() );
        
@@ -132,12 +129,9 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                        _aIters.add( (int)(getExecutedIterations()-numIter) );
                }
                
-               //write output if required (matrix indexed write) 
-               ArrayList<String> tmp = 
RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), 
_resultVars );
-               for( String val : tmp )
-                       ret.add(new Tuple2<>(_workerID, val));
-               
-               return ret.iterator();
+               //write output if required (matrix indexed write)
+               return RemoteParForUtils.exportResultVariables(_workerID, 
_ec.getVariables(), _resultVars)
+                       .stream().map(s -> new Tuple2<>(_workerID, 
s)).iterator();
        }
 
        private void configureWorker( long ID ) 
@@ -150,7 +144,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                        CodegenUtils.getClassSync(e.getKey(), e.getValue());
        
                //parse and setup parfor body program
-               ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
+               ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID, true);
                _childBlocks = body.getChildBlocks();
                _ec          = body.getEc();
                _resultVars  = body.getResultVariables();
@@ -171,7 +165,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                                                
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
                                //register entire working dir for delete on 
shutdown
                                
RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
-                       }       
+                       }
                }
                
                //ensure that resultvar files are not removed

http://git-wip-us.apache.org/repos/asf/systemml/blob/d34d6a62/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 45e3bc7..7485602 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -83,12 +82,8 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                
                //write output if required (matrix indexed write) 
                //note: this copy is necessary for environments without spark 
libraries
-               ArrayList<Tuple2<Long,String>> ret = new ArrayList<>();
-               ArrayList<String> tmp = 
RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), 
_resultVars );
-               for( String val : tmp )
-                       ret.add(new Tuple2<>(_workerID, val));
-               
-               return ret.iterator();
+               return RemoteParForUtils.exportResultVariables(_workerID, 
_ec.getVariables(), _resultVars)
+                       .stream().map(s -> new Tuple2<>(_workerID, 
s)).iterator();
        }
        
        private void configureWorker(long taskID) 
@@ -101,7 +96,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                        CodegenUtils.getClassSync(e.getKey(), e.getValue());
        
                //parse and setup parfor body program
-               ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID);
+               ParForBody body = ProgramConverter.parseParForBody(_prog, 
(int)_workerID, true);
                _childBlocks = body.getChildBlocks();
                _ec          = body.getEc();
                _resultVars  = body.getResultVariables();

Reply via email to