[SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b028e6ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b028e6ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b028e6ce
Branch: refs/heads/master Commit: b028e6cee12d8cc9bb4e1728fffc852cef7282c1 Parents: e82de90 Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Feb 24 19:03:56 2017 -0800 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sat Feb 25 11:51:05 2017 -0800 ---------------------------------------------------------------------- .../controlprogram/ParForProgramBlock.java | 33 +++++++++----------- 1 file changed, 15 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b028e6ce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index af3a0d1..4cdfa7b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -23,6 +23,7 @@ import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -1063,8 +1064,8 @@ public class ParForProgramBlock extends ForProgramBlock if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop()); - //write matrices to HDFS - exportMatricesToHDFS(ec); + //write matrices to HDFS, except DP matrix which is the input to the RemoteDPParForSpark job + exportMatricesToHDFS(ec, _colocatedDPMatrix); // Step 4) submit MR job (wait for finished work) OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)|| @@ -1258,37 +1259,33 @@ public class ParForProgramBlock extends ForProgramBlock } } - private void exportMatricesToHDFS( ExecutionContext ec ) + private void exportMatricesToHDFS(ExecutionContext ec, String... blacklistNames) throws CacheException { ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock(); + HashSet<String> blacklist = new HashSet<String>(Arrays.asList(blacklistNames)); if( LIVEVAR_AWARE_EXPORT && sb != null) { //optimization to prevent unnecessary export of matrices //export only variables that are read in the body VariableSet varsRead = sb.variablesRead(); - for (String key : ec.getVariables().keySet() ) - { - Data d = ec.getVariable(key); - if ( d.getDataType() == DataType.MATRIX - && varsRead.containsVariable(key) ) - { - MatrixObject mo = (MatrixObject)d; - mo.exportData( _replicationExport ); + for (String key : ec.getVariables().keySet() ) { + if( varsRead.containsVariable(key) && !blacklist.contains(key) ) { + Data d = ec.getVariable(key); + if( d.getDataType() == DataType.MATRIX ) + ((MatrixObject)d).exportData(_replicationExport); } } } else { //export all matrices in symbol table - for (String key : ec.getVariables().keySet() ) - { - Data d = ec.getVariable(key); - if ( d.getDataType() == DataType.MATRIX ) - { - MatrixObject mo = (MatrixObject)d; - mo.exportData( _replicationExport ); + for (String key : ec.getVariables().keySet() ) { + if( !blacklist.contains(key) ) { + Data d = ec.getVariable(key); + if( d.getDataType() == DataType.MATRIX ) + ((MatrixObject)d).exportData(_replicationExport); } } }