Repository: systemml Updated Branches: refs/heads/master 80dcb5280 -> 2cf78819d
[SYSTEMML-2425] Fix robustness parallel csv read for incorrect meta data This patch fixes the robustness of the parallel csv reader to properly handle incorrectly specified meta data (provided dimensions mismatch the read dimensions) by either (1) padding the matrix and raising a warning, or (2) raising an exception for consistency with the sequential readers and the already compiled execution plan. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2cf78819 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2cf78819 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2cf78819 Branch: refs/heads/master Commit: 2cf78819d87a12afbda7ccf08ffa0d894e3207c7 Parents: 80dcb52 Author: Matthias Boehm <[email protected]> Authored: Mon Jun 25 20:55:10 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Jun 25 20:55:10 2018 -0700 ---------------------------------------------------------------------- .../apache/sysml/runtime/io/MatrixReader.java | 4 ++++ .../sysml/runtime/io/ReaderTextCSVParallel.java | 25 ++++++++++++++++---- 2 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/2cf78819/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index 3109065..61a5893 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -29,6 +29,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.sysml.hops.OptimizerUtils; @@ -47,6 +49,8 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public abstract class MatrixReader { + protected static final Log LOG = LogFactory.getLog(MatrixReader.class.getName()); + //internal configuration protected static final boolean AGGREGATE_BLOCK_NNZ = true; protected static final boolean RETURN_EMPTY_NNZ0 = true; http://git-wip-us.apache.org/repos/asf/systemml/blob/2cf78819/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 440992a..53cf1d4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -90,8 +90,8 @@ public class ReaderTextCSVParallel extends MatrixReader // allocate output matrix block // First Read Pass (count rows/cols, determine offsets, allocate matrix block) - MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, - path, job, _props.hasHeader(), _props.getDelim(), estnnz); + MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, job, + _props.hasHeader(), _props.getDelim(), rlen, clen, estnnz); rlen = ret.getNumRows(); clen = ret.getNumColumns(); @@ -161,9 +161,9 @@ public class ReaderTextCSVParallel extends MatrixReader } } - private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock( - InputSplit[] splits, Path path, JobConf job, boolean hasHeader, - String delim, long estnnz) throws IOException, DMLRuntimeException + private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] splits, Path path, + JobConf job, boolean hasHeader, String delim, long rlen, long clen, long estnnz) + throws IOException, DMLRuntimeException { int nrow = 0; int ncol = 0; @@ -214,6 +214,21 @@ public class ReaderTextCSVParallel extends MatrixReader throw new IOException("Threadpool Error " + e.getMessage(), e); } + //robustness for wrong dimensions which are already compiled into the plan + if( (rlen != -1 && nrow != rlen) || (clen != -1 && ncol != clen) ) { + String msg = "Read matrix dimensions differ from meta data: ["+nrow+"x"+ncol+"] vs. ["+rlen+"x"+clen+"]."; + if( rlen < nrow || clen < ncol ) { + //a) specified matrix dimensions too small + throw new DMLRuntimeException(msg); + } + else { + //b) specified matrix dimensions too large -> padding and warning + LOG.warn(msg); + nrow = (int) rlen; + ncol = (int) clen; + } + } + // allocate target matrix block based on given size; // need to allocate sparse as well since lock-free insert into target long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
