Repository: systemml Updated Branches: refs/heads/master 85cb9e34e -> d753af90d
[SYSTEMML-2135] I/O operation support for matrices with zero rows/cols This patch completes the support for matrices with zero rows or columns by extending the readers and writers for all formats and all backends accordingly. Furthermore, this also includes a number of minor modifications to prevent incorrect validation errors. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d753af90 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d753af90 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d753af90 Branch: refs/heads/master Commit: d753af90da8bfd07e4feac34db476e139eeb217d Parents: 85cb9e3 Author: Matthias Boehm <[email protected]> Authored: Mon Feb 12 20:40:20 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Mon Feb 12 20:40:20 2018 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/parser/DataExpression.java | 37 ++-- .../parfor/ResultMergeLocalFile.java | 4 +- .../instructions/spark/WriteSPInstruction.java | 5 + .../spark/utils/RDDConverterUtils.java | 1 + .../instructions/spark/utils/SparkUtils.java | 4 +- .../sysml/runtime/io/FrameWriterTextCell.java | 7 +- .../sysml/runtime/io/IOUtilFunctions.java | 3 + .../apache/sysml/runtime/io/ReaderTextCell.java | 6 +- .../runtime/io/ReaderTextCellParallel.java | 14 +- .../sysml/runtime/io/WriterMatrixMarket.java | 7 +- .../apache/sysml/runtime/io/WriterTextCSV.java | 5 +- .../sysml/runtime/io/WriterTextCSVParallel.java | 22 +- .../apache/sysml/runtime/io/WriterTextCell.java | 8 +- .../runtime/matrix/MatrixCharacteristics.java | 3 +- .../apache/sysml/runtime/matrix/WriteCSVMR.java | 7 + .../matrix/data/TextToBinaryCellConverter.java | 14 +- .../runtime/matrix/mapred/ReblockMapper.java | 15 +- .../functions/data/WriteReadZeroDimsTest.java | 217 +++++++++++++++++++ .../scripts/functions/data/ZeroDimDataRead.dml | 29 +++ .../scripts/functions/data/ZeroDimDataWrite.dml | 23 ++ .../functions/data/ZPackageSuite.java | 1 + 21 files changed, 359 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/parser/DataExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java index d17443f..9764dfc 100644 --- a/src/main/java/org/apache/sysml/parser/DataExpression.java +++ b/src/main/java/org/apache/sysml/parser/DataExpression.java @@ -682,7 +682,6 @@ public class DataExpression extends DataIdentifier // process 2nd line of MatrixMarket format -- must have size information - String secondLine = headerLines[1]; String[] sizeInfo = secondLine.trim().split("\\s+"); if (sizeInfo.length != 3){ @@ -690,39 +689,37 @@ public class DataExpression extends DataIdentifier headerLines[1] + ". Only supported format in MatrixMarket file has size line: <NUM ROWS> <NUM COLS> <NUM NON-ZEROS>, where each value is an integer.", conditional); } - long rowsCount = -1, colsCount = -1, nnzCount = -1; try { - rowsCount = Long.parseLong(sizeInfo[0]); - if (rowsCount < 1) + long rowsCount = Long.parseLong(sizeInfo[0]); + if (rowsCount < 0) throw new Exception("invalid rows count"); addVarParam(READROWPARAM, new IntIdentifier(rowsCount, this)); } catch (Exception e) { - raiseValidateError( - "In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid row count " + sizeInfo[0] - + " (must be long value >= 1). Sizing info line from file: " + headerLines[1], - conditional, LanguageErrorCodes.INVALID_PARAMETERS); + raiseValidateError("In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid row count " + + sizeInfo[0] + " (must be long value >= 0). Sizing info line from file: " + headerLines[1], + conditional, LanguageErrorCodes.INVALID_PARAMETERS); } try { - colsCount = Long.parseLong(sizeInfo[1]); - if (colsCount < 1) + long colsCount = Long.parseLong(sizeInfo[1]); + if (colsCount < 0) throw new Exception("invalid cols count"); addVarParam(READCOLPARAM, new IntIdentifier(colsCount, this)); } catch (Exception e) { raiseValidateError("In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid column count " - + sizeInfo[1] + " (must be long value >= 1). Sizing info line from file: " + + sizeInfo[1] + " (must be long value >= 0). Sizing info line from file: " + headerLines[1], conditional, LanguageErrorCodes.INVALID_PARAMETERS); } try { - nnzCount = Long.parseLong(sizeInfo[2]); - if (nnzCount < 1) + long nnzCount = Long.parseLong(sizeInfo[2]); + if (nnzCount < 0) throw new Exception("invalid nnz count"); addVarParam("nnz", new IntIdentifier(nnzCount, this)); } catch (Exception e) { raiseValidateError("In MatrixMarket file " + getVarParam(IO_FILENAME) + " invalid number non-zeros " + sizeInfo[2] - + " (must be long value >= 1). Sizing info line from file: " + headerLines[1], + + " (must be long value >= 0). Sizing info line from file: " + headerLines[1], conditional, LanguageErrorCodes.INVALID_PARAMETERS); } } @@ -857,17 +854,17 @@ public class DataExpression extends DataIdentifier if ( !isCSV && ConfigurationManager.getCompilerConfig() .getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) //skip check for csv format / jmlc api && (getVarParam(READROWPARAM) == null || getVarParam(READCOLPARAM) == null) ) { - raiseValidateError("Missing or incomplete dimension information in read statement: " - + mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS); + raiseValidateError("Missing or incomplete dimension information in read statement: " + + mtdFileName, conditional, LanguageErrorCodes.INVALID_PARAMETERS); } if (getVarParam(READROWPARAM) instanceof ConstIdentifier - && getVarParam(READCOLPARAM) instanceof ConstIdentifier) + && getVarParam(READCOLPARAM) instanceof ConstIdentifier) { // these are strings that are long values Long dim1 = (getVarParam(READROWPARAM) == null) ? null : Long.valueOf( getVarParam(READROWPARAM).toString()); - Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString()); - if ( !isCSV && (dim1 <= 0 || dim2 <= 0) && ConfigurationManager + Long dim2 = (getVarParam(READCOLPARAM) == null) ? null : Long.valueOf( getVarParam(READCOLPARAM).toString()); + if ( !isCSV && (dim1 < 0 || dim2 < 0) && ConfigurationManager .getCompilerConfig().getBool(ConfigType.REJECT_READ_WRITE_UNKNOWNS) ) { raiseValidateError("Invalid dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS); } @@ -877,7 +874,7 @@ public class DataExpression extends DataIdentifier getOutput().setDimensions(dim1, dim2); } else if (!isCSV && ((dim1 != null) || (dim2 != null))) { raiseValidateError("Partial dimension information in read statement", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } + } } // initialize block dimensions to UNKNOWN http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index dcb85c8..0768328 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -849,10 +849,10 @@ public class ResultMergeLocalFile extends ResultMerge } } } - } + } if( !written ) - out.write("1 1 0\n"); + out.write(IOUtilFunctions.EMPTY_TEXT_LINE); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index 5894e76..76f8c8f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -189,6 +189,11 @@ public class WriteSPInstruction extends SPInstruction { } else if( oi == OutputInfo.CSVOutputInfo ) { + if( mc.getRows() == 0 || mc.getCols() == 0 ) { + throw new IOException("Write of matrices with zero rows or columns" + + " not supported ("+mc.getRows()+"x"+mc.getCols()+")."); + } + LongAccumulator aNnz = null; //piggyback nnz computation on actual write http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index e3ab541..2dedc91 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -535,6 +535,7 @@ public class RDDConverterUtils st.reset( strVal ); long row = st.nextLong(); long col = st.nextLong(); + if( row == 0 || col == 0 ) continue; double val = st.nextDouble(); //flush buffer if necessary http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 977cd10..801fe5a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -197,8 +197,8 @@ public class SparkUtils public static JavaPairRDD<MatrixIndexes, MatrixBlock> getEmptyBlockRDD( JavaSparkContext sc, MatrixCharacteristics mc ) { //compute degree of parallelism and block ranges - long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.max( - mc.getRows(), mc.getRowsPerBlock()), Math.max(mc.getCols(), mc.getColsPerBlock())); + long size = mc.getNumBlocks() * OptimizerUtils.estimateSizeEmptyBlock(Math.min( + Math.max(mc.getRows(),1), mc.getRowsPerBlock()), Math.min(Math.max(mc.getCols(),1), mc.getColsPerBlock())); int par = (int) Math.min(Math.max(SparkExecutionContext.getDefaultParallelism(true), Math.ceil(size/InfrastructureAnalyzer.getHDFSBlockSize())), mc.getNumBlocks()); long pNumBlocks = (long)Math.ceil((double)mc.getNumBlocks()/par); http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java index 2bbd206..be11fea 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java @@ -128,12 +128,11 @@ public class FrameWriterTextCell extends FrameWriter } //handle empty result - if ( !entriesWritten ) { - br.write("1 1 0\n"); - } + if ( !entriesWritten ) + br.write(IOUtilFunctions.EMPTY_TEXT_LINE); } finally { IOUtilFunctions.closeSilently(br); - } + } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 526ad98..4d388c9 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -55,6 +55,9 @@ public class IOUtilFunctions { private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName()); + //for empty text lines we use 0-0 despite for 1-based indexing in order + //to allow matrices with zero rows and columns (consistent with R) + public static final String EMPTY_TEXT_LINE = "0 0 0\n"; private static final char CSV_QUOTE_CHAR = '"'; public static FileSystem getFileSystem(String fname) throws IOException { http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java index dd5ae51..918ab6b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java @@ -128,6 +128,7 @@ public class ReaderTextCell extends MatrixReader st.reset( value.toString() ); //reinit tokenizer row = st.nextInt() - 1; col = st.nextInt() - 1; + if(row == -1 || col == -1) continue; double lvalue = st.nextDouble(); dest.appendValue(row, col, lvalue); } @@ -141,6 +142,7 @@ public class ReaderTextCell extends MatrixReader st.reset( value.toString() ); //reinit tokenizer row = st.nextInt()-1; col = st.nextInt()-1; + if(row == -1 || col == -1) continue; double lvalue = st.nextDouble(); a.set( row, col, lvalue ); } @@ -174,7 +176,7 @@ public class ReaderTextCell extends MatrixReader private static void readRawTextCellMatrixFromInputStream( InputStream is, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket ) throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader( is )); + BufferedReader br = new BufferedReader(new InputStreamReader( is )); boolean sparse = dest.isInSparseFormat(); String value = null; @@ -214,6 +216,7 @@ public class ReaderTextCell extends MatrixReader st.reset( value ); //reinit tokenizer row = st.nextInt()-1; col = st.nextInt()-1; + if(row == -1 || col == -1) continue; double lvalue = st.nextDouble(); dest.appendValue(row, col, lvalue); } @@ -227,6 +230,7 @@ public class ReaderTextCell extends MatrixReader st.reset( value ); //reinit tokenizer row = st.nextInt()-1; col = st.nextInt()-1; + if(row == -1 || col == -1) continue; double lvalue = st.nextDouble(); a.set( row, col, lvalue ); } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index b692cb1..6087210 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -83,7 +83,7 @@ public class ReaderTextCellParallel extends MatrixReader throws IOException, DMLRuntimeException { //prepare file access - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path( fname ); FileSystem fs = IOUtilFunctions.getFileSystem(path, job); @@ -214,10 +214,12 @@ public class ReaderTextCellParallel extends MatrixReader st.reset( value.toString() ); //reinit tokenizer row = st.nextInt()-1; col = st.nextInt()-1; - double lvalue = st.nextDoubleForParallel(); - synchronized( _dest ){ //sparse requires lock - _dest.appendValue(row, col, lvalue); - lnnz++; + if(row != -1 || col != -1) { + double lvalue = st.nextDoubleForParallel(); + synchronized( _dest ){ //sparse requires lock + _dest.appendValue(row, col, lvalue); + lnnz++; + } } } } @@ -230,6 +232,7 @@ public class ReaderTextCellParallel extends MatrixReader st.reset( value.toString() ); //reinit tokenizer row = st.nextInt() - 1; col = st.nextInt() - 1; + if(row == -1 || col == -1) continue; double lvalue = st.nextDoubleForParallel(); buff.addCell(row, col, lvalue); @@ -254,6 +257,7 @@ public class ReaderTextCellParallel extends MatrixReader st.reset( value.toString() ); //reinit tokenizer row = st.nextInt()-1; col = st.nextInt()-1; + if(row == -1 || col == -1) continue; double lvalue = st.nextDoubleForParallel(); a.set( row, col, lvalue ); lnnz += (lvalue!=0) ? 1 : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java index f862f3d..c9f42bf 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java @@ -74,7 +74,7 @@ public class WriterMatrixMarket extends MatrixWriter FSDataOutputStream writer = null; try { writer = fs.create(path); - writer.writeBytes("1 1 0"); + writer.writeBytes(IOUtilFunctions.EMPTY_TEXT_LINE); } finally { IOUtilFunctions.closeSilently(writer); @@ -156,9 +156,8 @@ public class WriterMatrixMarket extends MatrixWriter } //handle empty result - if ( src.isEmptyBlock(false) && rl==0 ) { - br.write("1 1 0\n"); - } + if ( src.isEmptyBlock(false) && rl==0 ) + br.write(IOUtilFunctions.EMPTY_TEXT_LINE); } finally { IOUtilFunctions.closeSilently(br); http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java index 7b39eb3..a3015f2 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java @@ -58,9 +58,10 @@ public class WriterTextCSV extends MatrixWriter throws IOException, DMLRuntimeException { //validity check matrix dimensions - if( src.getNumRows() != rlen || src.getNumColumns() != clen ) { + if( src.getNumRows() != rlen || src.getNumColumns() != clen ) throw new IOException("Matrix dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+"."); - } + if( rlen == 0 || clen == 0 ) + throw new IOException("Write of matrices with zero rows or columns not supported ("+rlen+"x"+clen+")."); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java index 2bef173..2c56220 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java @@ -46,11 +46,11 @@ public class WriterTextCSVParallel extends WriterTextCSV } @Override - protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops) + protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops) throws IOException { //estimate output size and number of output blocks (min 1) - int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), + int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), src.getNonZeros(), OutputInfo.CSVOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); numPartFiles = Math.max(numPartFiles, 1); @@ -80,7 +80,7 @@ public class WriterTextCSVParallel extends WriterTextCSV } //wait until all tasks have been executed - List<Future<Object>> rt = pool.invokeAll(tasks); + List<Future<Object>> rt = pool.invokeAll(tasks); pool.shutdown(); //check for exceptions @@ -101,13 +101,12 @@ public class WriterTextCSVParallel extends WriterTextCSV private class WriteCSVTask implements Callable<Object> { - private JobConf _job = null; - private FileSystem _fs = null; - private MatrixBlock _src = null; - private Path _path =null; - private int _rl = -1; - private int _ru = -1; - private CSVFileFormatProperties _props = null; + private final JobConf _job; + private final FileSystem _fs; + private final MatrixBlock _src; + private final Path _path; + private final int _rl, _ru; + private final CSVFileFormatProperties _props; public WriteCSVTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props) { _path = path; @@ -120,8 +119,7 @@ public class WriterTextCSVParallel extends WriterTextCSV } @Override - public Object call() throws Exception - { + public Object call() throws Exception { writeCSVMatrixToFile(_path, _job, _fs, _src, _rl, _ru, _props); return null; } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java index b0636a3..0438f46 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java @@ -66,7 +66,7 @@ public class WriterTextCell extends MatrixWriter Path path = new Path( fname ); FileSystem fs = IOUtilFunctions.getFileSystem(path); try( FSDataOutputStream writer = fs.create(path) ){ - writer.writeBytes("1 1 0"); + writer.writeBytes(IOUtilFunctions.EMPTY_TEXT_LINE); } IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); @@ -126,15 +126,13 @@ public class WriterTextCell extends MatrixWriter br.write( sb.toString() ); //same as append sb.setLength(0); } - } } } //handle empty result - if ( src.isEmptyBlock(false) && rl==0 ) { - br.write("1 1 0\n"); - } + if ( src.isEmptyBlock(false) && rl==0 ) + br.write(IOUtilFunctions.EMPTY_TEXT_LINE); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java index 21c42cd..6f73de1 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java @@ -205,7 +205,8 @@ public class MatrixCharacteristics implements Serializable public boolean mightHaveEmptyBlocks() { long singleBlk = Math.max(Math.min(numRows, numRowsPerBlock),1) * Math.max(Math.min(numColumns, numColumnsPerBlock),1); - return !nnzKnown() || (nonZero < numRows*numColumns - singleBlk); + return !nnzKnown() || numRows==0 || numColumns==0 + || (nonZero < numRows*numColumns - singleBlk); } public static void reorg(MatrixCharacteristics dim, ReorgOperator op, http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java index c4f9a56..f1c9b35 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/WriteCSVMR.java @@ -19,6 +19,7 @@ package org.apache.sysml.runtime.matrix; +import java.io.IOException; import java.util.HashMap; import org.apache.commons.logging.Log; @@ -61,6 +62,12 @@ public class WriteCSVMR JobConf job = new JobConf(WriteCSVMR.class); job.setJobName("WriteCSV-MR"); + //check for valid output dimensions + for( int i=0; i<rlens.length; i++ ) + if( rlens[i] == 0 || clens[i] == 0 ) + throw new IOException("Write of matrices with zero" + + " rows or columns not supported ("+rlens[i]+"x"+clens[i]+")."); + byte[] realIndexes=new byte[inputs.length]; for(byte b=0; b<realIndexes.length; b++) realIndexes[b]=b; http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java b/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java index 036743f..b51a809 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/TextToBinaryCellConverter.java @@ -53,14 +53,18 @@ implements Converter<LongWritable, Text, MatrixIndexes, MatrixCell> hasValue=false; return; } - + //reset the tokenizer st.reset( str ); //convert text to matrix cell indexes.setIndexes( st.nextLong(), st.nextLong() ); + if( indexes.getRowIndex() == 0 || indexes.getColumnIndex() == 0 ) { + hasValue = false; + return; + } value.setValue( st.nextDouble() ); - hasValue = true; + hasValue = true; } @Override @@ -72,14 +76,12 @@ implements Converter<LongWritable, Text, MatrixIndexes, MatrixCell> public Pair<MatrixIndexes, MatrixCell> next() { if(!hasValue) return null; - hasValue=false; return pair; } @Override - public void setBlockSize(int rl, int cl) - { - + public void setBlockSize(int rl, int cl) { + //do nothing } } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java index de34f3a..a7ce655 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockMapper.java @@ -79,8 +79,7 @@ public class ReblockMapper extends MapperBase ReblockInstruction[] reblockInstructions = MRJobConfiguration.getReblockInstructions(job); //get dimension information - for(ReblockInstruction ins: reblockInstructions) - { + for(ReblockInstruction ins: reblockInstructions) { dimensionsIn.put(ins.input, MRJobConfiguration.getMatrixCharacteristicsForInput(job, ins.input)); dimensionsOut.put(ins.output, MRJobConfiguration.getMatrixCharactristicsForReblock(job, ins.output)); emptyBlocks.put(ins.output, ins.outputEmptyBlocks); @@ -90,7 +89,7 @@ public class ReblockMapper extends MapperBase //(buffer size divided by max reblocks per input matrix, because those are shared in JVM) int maxlen = 1; for( ArrayList<ReblockInstruction> rinst : reblock_instructions ) - maxlen = Math.max(maxlen, rinst.size()); //max reblocks per input + maxlen = Math.max(maxlen, rinst.size()); //max reblocks per input buffersize = ReblockBuffer.DEFAULT_BUFFER_SIZE/maxlen; } catch (Exception e) @@ -106,8 +105,7 @@ public class ReblockMapper extends MapperBase super.close(); //flush buffered data - for( Entry<Byte,ReblockBuffer> e : buffer.entrySet() ) - { + for( Entry<Byte,ReblockBuffer> e : buffer.entrySet() ) { ReblockBuffer rbuff = e.getValue(); rbuff.flushBuffer(e.getKey(), cachedCollector); } @@ -142,13 +140,12 @@ public class ReblockMapper extends MapperBase //output part of empty blocks (all mappers contribute for better load balance), //where mapper responsibility is distributed over row blocks - long numBlocks = (long)Math.ceil((double)rlen/brlen); + long numBlocks = (long)Math.ceil((double)Math.max(rlen,1)/brlen); long len = (long)Math.ceil((double)numBlocks/numMap); long start = mapID * len * brlen; - long end = Math.min((mapID+1) * len * brlen, rlen); + long end = Math.min((mapID+1) * len * brlen, Math.max(rlen,1)); for(long i=start, r=start/brlen+1; i<end; i+=brlen, r++) - for(long j=0, c=1; j<clen; j+=bclen, c++) - { + for(long j=0, c=1; j<Math.max(clen,1); j+=bclen, c++) { tmpIx.setIndexes(r, c); cachedCollector.collect(tmpIx, tmpVal); } http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java b/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java new file mode 100644 index 0000000..c3a8da4 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/data/WriteReadZeroDimsTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.data; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; + +public class WriteReadZeroDimsTest extends AutomatedTestBase +{ + private final static String TEST_NAME1 = "ZeroDimDataWrite"; + private final static String TEST_NAME2 = "ZeroDimDataRead"; + private final static String TEST_DIR = "functions/data/"; + private final static String TEST_CLASS_DIR = TEST_DIR + WriteReadZeroDimsTest.class.getSimpleName() + "/"; + + private final static int rowsM = 1200; + private final static int colsM = 1100; + + public enum Type{ + Zero_Rows, + Zero_Cols, + } + + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R1" }) ); + addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R2" }) ); + } + + @Test + public void testZeroRowsTextCP() { + runZeroDimsTest(Type.Zero_Rows, "text", ExecType.CP); + } + + @Test + public void testZeroColsTextCP() { + runZeroDimsTest(Type.Zero_Cols, "text", ExecType.CP); + } + + @Test + public void testZeroRowsMmCP() { + runZeroDimsTest(Type.Zero_Rows, "mm", ExecType.CP); + } + + @Test + public void testZeroColsMmCP() { + runZeroDimsTest(Type.Zero_Cols, "mm", ExecType.CP); + } + + @Test + public void testZeroRowsCsvCP() { + runZeroDimsTest(Type.Zero_Rows, "csv", ExecType.CP); + } + + @Test + public void testZeroColsCsvCP() { + runZeroDimsTest(Type.Zero_Cols, "csv", ExecType.CP); + } + + @Test + public void testZeroRowsBinCP() { + runZeroDimsTest(Type.Zero_Rows, "binary", ExecType.CP); + } + + @Test + public void testZeroColsBinCP() { + runZeroDimsTest(Type.Zero_Cols, "binary", ExecType.CP); + } + + @Test + public void testZeroRowsTextSP() { + runZeroDimsTest(Type.Zero_Rows, "text", ExecType.SPARK); + } + + @Test + public void testZeroColsTextSP() { + runZeroDimsTest(Type.Zero_Cols, "text", ExecType.SPARK); + } + + @Test + public void testZeroRowsMmSP() { + runZeroDimsTest(Type.Zero_Rows, "mm", ExecType.SPARK); + } + + @Test + public void testZeroColsMmSP() { + runZeroDimsTest(Type.Zero_Cols, "mm", ExecType.SPARK); + } + + @Test + public void testZeroRowsCsvSP() { + runZeroDimsTest(Type.Zero_Rows, "csv", ExecType.SPARK); + } + + @Test + public void testZeroColsCsvSP() { + runZeroDimsTest(Type.Zero_Cols, "csv", ExecType.SPARK); + } + + @Test + public void testZeroRowsBinSP() { + runZeroDimsTest(Type.Zero_Rows, "binary", ExecType.SPARK); + } + + @Test + public void testZeroColsBinSP() { + runZeroDimsTest(Type.Zero_Cols, "binary", ExecType.SPARK); + } + + @Test + public void testZeroRowsTextMR() { + runZeroDimsTest(Type.Zero_Rows, "text", ExecType.MR); + } + + @Test + public void testZeroColsTextMR() { + runZeroDimsTest(Type.Zero_Cols, "text", ExecType.MR); + } + + @Test + public void testZeroRowsMmMR() { + runZeroDimsTest(Type.Zero_Rows, "mm", ExecType.MR); + } + + @Test + public void testZeroColsMmMR() { + runZeroDimsTest(Type.Zero_Cols, "mm", ExecType.MR); + } + + @Test + public void testZeroRowsCsvMR() { + runZeroDimsTest(Type.Zero_Rows, "csv", ExecType.MR); + } + + @Test + public void testZeroColsCsvMR() { + runZeroDimsTest(Type.Zero_Cols, "csv", ExecType.MR); + } + + @Test + public void testZeroRowsBinMR() { + runZeroDimsTest(Type.Zero_Rows, "binary", ExecType.MR); + } + + @Test + public void testZeroColsBinMR() { + runZeroDimsTest(Type.Zero_Cols, "binary", ExecType.MR); + } + + private void runZeroDimsTest( Type type, String format, ExecType et ) + { + int rows = (type == Type.Zero_Rows) ? 0 : rowsM; + int cols = (type == Type.Zero_Cols) ? 0 : colsM; + + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break; + default: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + try { + //run write into format + loadTestConfiguration(getTestConfiguration(TEST_NAME1)); + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-args", String.valueOf(rows), + String.valueOf(cols), output("R1"), format}; + runTest(true, format.equals("csv"), null, -1); + + //run read from format + if( !format.equals("csv") ) { + loadTestConfiguration(getTestConfiguration(TEST_NAME2)); + HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME2 + ".dml"; + programArgs = new String[]{"-args", output("R1"), output("R2")}; + runTest(true, false, null, -1); + + //check overall result + double expected = ((type == Type.Zero_Rows) ? colsM : rowsM) * 7; + Assert.assertEquals(new Double(expected), + readDMLMatrixFromHDFS("R2").get(new CellIndex(1,1))); + } + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test/scripts/functions/data/ZeroDimDataRead.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/data/ZeroDimDataRead.dml b/src/test/scripts/functions/data/ZeroDimDataRead.dml new file mode 100644 index 0000000..7ebb917 --- /dev/null +++ b/src/test/scripts/functions/data/ZeroDimDataRead.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = read($1); +if( nrow(X) == 0 ) + X = rbind(X, matrix(1, 1, ncol(X))); +else if( ncol(X) == 0 ) + X = cbind(X, matrix(1, nrow(X), 1)); +R = as.matrix(sum(X * 7)); + +write(R, $2); http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test/scripts/functions/data/ZeroDimDataWrite.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/data/ZeroDimDataWrite.dml b/src/test/scripts/functions/data/ZeroDimDataWrite.dml new file mode 100644 index 0000000..21174b3 --- /dev/null +++ b/src/test/scripts/functions/data/ZeroDimDataWrite.dml @@ -0,0 +1,23 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = matrix(1, $1, $2); +write(X, $3, format=$4); http://git-wip-us.apache.org/repos/asf/systemml/blob/d753af90/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java index 7b77e4a..1dfd4fe 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/data/ZPackageSuite.java @@ -42,6 +42,7 @@ import org.junit.runners.Suite; SequenceTest.class, VariableTest.class, WriteMMTest.class, + WriteReadZeroDimsTest.class, WriteTest.class, })
