Repository: incubator-systemml
Updated Branches:
  refs/heads/master 841a4d030 -> c697c30eb


[SYSTEMML-1627] Fix guarded parallelize of matrices collected from rdds

This patch fixes special cases of rdd construction, where rdds are
created from in-memory matrices that have been previously collected from
rdds. Specifically, this targets guarded parallelize, which exports
matrices and creates the rdd from the hadoop file. So far we only
exported dirty in-memory matrices but not collected matrices (which are
not marked as dirty). Accordingly, following rdd operations fail with
file not found exceptions. This happens, for example, in special cases,
where unary operations (1 inputs, 1 output) run in CP, but binary
operations (2 outputs, 1 input) run in SPARK and we have to fall back to
guarded parallelize.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a362bce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a362bce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a362bce0

Branch: refs/heads/master
Commit: a362bce0e11ffcd163f03c0cfdf4598eed6378f3
Parents: 841a4d0
Author: Matthias Boehm <[email protected]>
Authored: Wed May 24 18:57:07 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu May 25 14:48:57 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/controlprogram/caching/CacheableData.java    | 6 ++----
 .../sysml/runtime/controlprogram/caching/MatrixObject.java     | 6 ++++--
 .../runtime/controlprogram/context/SparkExecutionContext.java  | 3 +--
 .../functions/misc/ValueTypeMatrixScalarBuiltinTest.java       | 2 +-
 4 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index c1a024a..fd6fa16 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -432,10 +432,8 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                                        _data = readBlobFromRDD( 
getRDDHandle(), writeStatus );
                                        
                                        //mark for initial local write (prevent 
repeated execution of rdd operations)
-                                       if( writeStatus.booleanValue() )
-                                               _requiresLocalWrite = 
CACHING_WRITE_CACHE_ON_READ;
-                                       else
-                                               _requiresLocalWrite = true;
+                                       _requiresLocalWrite = 
writeStatus.booleanValue() ? 
+                                               CACHING_WRITE_CACHE_ON_READ : 
true;
                                }
                                
                                setDirty(false);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 94bdb2d..4105351 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -436,10 +436,12 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                                        + ", dimensions: [" + mc.getRows() + ", 
" + mc.getCols() + ", " + mc.getNonZeros() + "]");
                        begin = System.currentTimeMillis();
                }
-                       
-               double sparsity = ( mc.getNonZeros() >= 0 ? 
((double)mc.getNonZeros())/(mc.getRows()*mc.getCols()) : 1.0d) ; 
+               
+               //read matrix and maintain meta data
+               double sparsity = (mc.getNonZeros() >= 0 ? 
((double)mc.getNonZeros())/(mc.getRows()*mc.getCols()) : 1.0d); 
                MatrixBlock newData = DataConverter.readMatrixFromHDFS(fname, 
iimd.getInputInfo(), rlen, clen,
                                mc.getRowsPerBlock(), mc.getColsPerBlock(), 
sparsity, getFileFormatProperties());
+               setHDFSFileExists(true);
                
                //sanity check correct output
                if( newData == null )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 92946ff..1dd3600 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -356,9 +356,8 @@ public class SparkExecutionContext extends ExecutionContext
                        boolean fromFile = false;
                        if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 
0) || !_parRDDs.reserve(
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc))) {
-                               if( mo.isDirty() ) { //write only if necessary
+                               if( mo.isDirty() || !mo.isHDFSFileExists() ) 
//write if necessary
                                        mo.exportData();
-                               }
                                rdd = sc.hadoopFile( mo.getFileName(), 
inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
                                rdd = 
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); 
//cp is workaround for read bug                   
                                fromFile = true;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java
index 61ffa7d..e2124b5 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java
@@ -112,7 +112,7 @@ public class ValueTypeMatrixScalarBuiltinTest extends 
AutomatedTestBase
                loadTestConfiguration(getTestConfiguration(testName));
                
                //setup arguments and run test
-        String RI_HOME = SCRIPT_DIR + TEST_DIR;
+               String RI_HOME = SCRIPT_DIR + TEST_DIR;
                fullDMLScriptName = RI_HOME + testName + ".dml";
                programArgs = new String[]{"-args", 
                        vtIn==ValueType.DOUBLE ? "7.7" : "7", output("R")};

Reply via email to