[SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b028e6ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b028e6ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b028e6ce

Branch: refs/heads/master
Commit: b028e6cee12d8cc9bb4e1728fffc852cef7282c1
Parents: e82de90
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Fri Feb 24 19:03:56 2017 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sat Feb 25 11:51:05 2017 -0800

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      | 33 +++++++++-----------
 1 file changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b028e6ce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index af3a0d1..4cdfa7b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1063,8 +1064,8 @@ public class ParForProgramBlock extends ForProgramBlock
                if( _monitor )
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_TASKS_T, time.stop());
                
-               //write matrices to HDFS 
-               exportMatricesToHDFS(ec);
+               //write matrices to HDFS, except DP matrix which is the input 
to the RemoteDPParForSpark job
+               exportMatricesToHDFS(ec, _colocatedDPMatrix);
                                
                // Step 4) submit MR job (wait for finished work)
                OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PDataPartitionFormat.COLUMN_WISE)||
@@ -1258,37 +1259,33 @@ public class ParForProgramBlock extends ForProgramBlock
                        }
        }
 
-       private void exportMatricesToHDFS( ExecutionContext ec ) 
+       private void exportMatricesToHDFS(ExecutionContext ec, String... 
blacklistNames) 
                throws CacheException 
        {
                ParForStatementBlock sb = 
(ParForStatementBlock)getStatementBlock();
+               HashSet<String> blacklist = new 
HashSet<String>(Arrays.asList(blacklistNames));
                
                if( LIVEVAR_AWARE_EXPORT && sb != null)
                {
                        //optimization to prevent unnecessary export of matrices
                        //export only variables that are read in the body
                        VariableSet varsRead = sb.variablesRead();
-                       for (String key : ec.getVariables().keySet() ) 
-                       {
-                               Data d = ec.getVariable(key);
-                               if (    d.getDataType() == DataType.MATRIX
-                                        && varsRead.containsVariable(key)  )
-                               {
-                                       MatrixObject mo = (MatrixObject)d;
-                                       mo.exportData( _replicationExport );
+                       for (String key : ec.getVariables().keySet() ) {
+                               if( varsRead.containsVariable(key) && 
!blacklist.contains(key) ) {
+                                       Data d = ec.getVariable(key);
+                                       if( d.getDataType() == DataType.MATRIX )
+                                               
((MatrixObject)d).exportData(_replicationExport);
                                }
                        }
                }
                else
                {
                        //export all matrices in symbol table
-                       for (String key : ec.getVariables().keySet() ) 
-                       {
-                               Data d = ec.getVariable(key);
-                               if ( d.getDataType() == DataType.MATRIX )
-                               {
-                                       MatrixObject mo = (MatrixObject)d;
-                                       mo.exportData( _replicationExport );
+                       for (String key : ec.getVariables().keySet() ) {
+                               if( !blacklist.contains(key) ) {
+                                       Data d = ec.getVariable(key);
+                                       if( d.getDataType() == DataType.MATRIX )
+                                               
((MatrixObject)d).exportData(_replicationExport);
                                }
                        }
                }

Reply via email to