Repository: incubator-systemml Updated Branches: refs/heads/master 841a4d030 -> c697c30eb
[SYSTEMML-1627] Fix guarded parallelize of matrices collected from rdds This patch fixes special cases of rdd construction, where rdds are created from in-memory matrices that have been previously collected from rdds. Specifically, this targets guarded parallelize, which exports matrices and creates the rdd from the hadoop file. So far we only exported dirty in-memory matrices but not collected matrices (which are not marked as dirty). Accordingly, following rdd operations fail with file not found exceptions. This happens, for example, in special cases, where unary operations (1 inputs, 1 output) run in CP, but binary operations (2 outputs, 1 input) run in SPARK and we have to fall back to guarded parallelize. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a362bce0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a362bce0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a362bce0 Branch: refs/heads/master Commit: a362bce0e11ffcd163f03c0cfdf4598eed6378f3 Parents: 841a4d0 Author: Matthias Boehm <[email protected]> Authored: Wed May 24 18:57:07 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu May 25 14:48:57 2017 -0700 ---------------------------------------------------------------------- .../sysml/runtime/controlprogram/caching/CacheableData.java | 6 ++---- .../sysml/runtime/controlprogram/caching/MatrixObject.java | 6 ++++-- .../runtime/controlprogram/context/SparkExecutionContext.java | 3 +-- .../functions/misc/ValueTypeMatrixScalarBuiltinTest.java | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index c1a024a..fd6fa16 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -432,10 +432,8 @@ public abstract class CacheableData<T extends CacheBlock> extends Data _data = readBlobFromRDD( getRDDHandle(), writeStatus ); //mark for initial local write (prevent repeated execution of rdd operations) - if( writeStatus.booleanValue() ) - _requiresLocalWrite = CACHING_WRITE_CACHE_ON_READ; - else - _requiresLocalWrite = true; + _requiresLocalWrite = writeStatus.booleanValue() ? + CACHING_WRITE_CACHE_ON_READ : true; } setDirty(false); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index 94bdb2d..4105351 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -436,10 +436,12 @@ public class MatrixObject extends CacheableData<MatrixBlock> + ", dimensions: [" + mc.getRows() + ", " + mc.getCols() + ", " + mc.getNonZeros() + "]"); begin = System.currentTimeMillis(); } - - double sparsity = ( mc.getNonZeros() >= 0 ? ((double)mc.getNonZeros())/(mc.getRows()*mc.getCols()) : 1.0d) ; + + //read matrix and maintain meta data + double sparsity = (mc.getNonZeros() >= 0 ? ((double)mc.getNonZeros())/(mc.getRows()*mc.getCols()) : 1.0d); MatrixBlock newData = DataConverter.readMatrixFromHDFS(fname, iimd.getInputInfo(), rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), sparsity, getFileFormatProperties()); + setHDFSFileExists(true); //sanity check correct output if( newData == null ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 92946ff..1dd3600 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -356,9 +356,8 @@ public class SparkExecutionContext extends ExecutionContext boolean fromFile = false; if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 0) || !_parRDDs.reserve( OptimizerUtils.estimatePartitionedSizeExactSparsity(mc))) { - if( mo.isDirty() ) { //write only if necessary + if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary mo.exportData(); - } rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug fromFile = true; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a362bce0/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java b/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java index 61ffa7d..e2124b5 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/misc/ValueTypeMatrixScalarBuiltinTest.java @@ -112,7 +112,7 @@ public class ValueTypeMatrixScalarBuiltinTest extends AutomatedTestBase loadTestConfiguration(getTestConfiguration(testName)); //setup arguments and run test - String RI_HOME = SCRIPT_DIR + TEST_DIR; + String RI_HOME = SCRIPT_DIR + TEST_DIR; fullDMLScriptName = RI_HOME + testName + ".dml"; programArgs = new String[]{"-args", vtIn==ValueType.DOUBLE ? "7.7" : "7", output("R")};
