[SYSTEMML-2442] Compiler/runtime integration matrix market properties This patch generalizes the compiler and runtime to properly parse the matrix market header meta data and check for supported types. Furthermore, we also read and parse these properties during runtime for robustness and custom reader configurations. As a basis for subsequent extensions, the sequential/parallel textcell/matrix-market readers have been refactored to avoid code duplication where possible.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/11d11987 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/11d11987 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/11d11987 Branch: refs/heads/master Commit: 11d119877448d8346bd9999389c906e4be857a8a Parents: ebe9e8a Author: Matthias Boehm <[email protected]> Authored: Fri Jul 13 21:32:16 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jul 13 21:32:16 2018 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/parser/DataExpression.java | 84 +++-------- .../controlprogram/caching/CacheableData.java | 2 +- .../controlprogram/caching/FrameObject.java | 2 +- .../controlprogram/caching/MatrixObject.java | 2 +- .../instructions/cp/VariableCPInstruction.java | 12 +- .../spark/ReblockSPInstruction.java | 20 ++- .../instructions/spark/WriteSPInstruction.java | 10 +- .../spark/utils/FrameRDDConverterUtils.java | 8 +- .../spark/utils/RDDConverterUtils.java | 21 +-- .../sysml/runtime/io/FileFormatProperties.java | 37 +++++ .../runtime/io/FileFormatPropertiesCSV.java | 105 ++++++++++++++ .../runtime/io/FileFormatPropertiesMM.java | 142 +++++++++++++++++++ .../sysml/runtime/io/FrameReaderFactory.java | 10 +- .../sysml/runtime/io/FrameReaderTextCSV.java | 5 +- .../runtime/io/FrameReaderTextCSVParallel.java | 3 +- .../sysml/runtime/io/FrameWriterFactory.java | 8 +- .../sysml/runtime/io/FrameWriterTextCSV.java | 11 +- .../runtime/io/FrameWriterTextCSVParallel.java | 9 +- .../sysml/runtime/io/IOUtilFunctions.java | 52 ++++++- .../sysml/runtime/io/MatrixReaderFactory.java | 9 +- .../sysml/runtime/io/MatrixWriterFactory.java | 8 +- .../apache/sysml/runtime/io/ReadProperties.java | 1 - .../apache/sysml/runtime/io/ReaderTextCSV.java | 5 +- .../sysml/runtime/io/ReaderTextCSVParallel.java | 5 +- .../apache/sysml/runtime/io/ReaderTextCell.java | 56 +++++--- .../runtime/io/ReaderTextCellParallel.java | 56 +------- .../apache/sysml/runtime/io/WriterTextCSV.java | 11 +- .../sysml/runtime/io/WriterTextCSVParallel.java | 9 +- .../matrix/data/CSVFileFormatProperties.java | 112 --------------- .../matrix/data/FileFormatProperties.java | 54 ------- .../sysml/runtime/util/DataConverter.java | 2 +- .../sysml/runtime/util/MapReduceTool.java | 8 +- .../test/integration/AutomatedTestBase.java | 4 +- .../functions/data/MatrixMarketFormatTest.java | 27 +--- .../functions/frame/FrameConverterTest.java | 4 +- .../functions/frame/FrameReadWriteTest.java | 6 +- .../recompile/CSVReadUnknownSizeTest.java | 4 +- .../transform/FrameCSVReadWriteTest.java | 4 +- .../TransformCSVFrameEncodeDecodeTest.java | 4 +- .../TransformCSVFrameEncodeReadTest.java | 6 +- .../TransformFrameEncodeColmapTest.java | 4 +- .../TransformFrameEncodeDecodeTest.java | 4 +- .../TransformFrameEncodeDecodeTokenTest.java | 6 +- 43 files changed, 511 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/parser/DataExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java index acccdf3..eafdd13 100644 --- a/src/main/java/org/apache/sysml/parser/DataExpression.java +++ b/src/main/java/org/apache/sysml/parser/DataExpression.java @@ -20,7 +20,6 @@ package org.apache.sysml.parser; import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; @@ -36,7 +35,9 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.DataGenOp; import org.apache.sysml.parser.LanguageException.LanguageErrorCodes; import org.apache.sysml.parser.common.CustomErrorListener; +import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.io.FileFormatPropertiesMM; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; @@ -641,28 +642,30 @@ public class DataExpression extends DataIdentifier * 1) only allow IO_FILENAME as ONLY valid parameter * * 2) open the file - * A) verify header line (1st line) equals - * B) read and discard comment lines - * C) get size information from sizing info line --- M N L + * A) verify header line (1st line) equals + * B) read and discard comment lines + * C) get size information from sizing info line --- M N L */ // should NOT attempt to read MTD file for MatrixMarket format shouldReadMTD = false; // get metadata from MatrixMarket format file - String[] headerLines = readMatrixMarketFile(inputFileName, conditional); - - // process 1st line of MatrixMarket format -- must be identical to legal header - String legalHeaderMM = "%%MatrixMarket matrix coordinate real general"; + String[] headerLines = null; + try { + headerLines = IOUtilFunctions.readMatrixMarketHeader(inputFileName); + } + catch(DMLRuntimeException ex) { + raiseValidateError(ex.getMessage(), conditional); + } if (headerLines != null && headerLines.length >= 2){ + // process 1st line of MatrixMarket format to check for support types + String firstLine = headerLines[0].trim(); - if (!firstLine.equals(legalHeaderMM)){ - raiseValidateError("Unsupported format in MatrixMarket file: " + - headerLines[0] + ". Only supported format in MatrixMarket file has header line " + legalHeaderMM, - conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - + @SuppressWarnings("unused") + FileFormatPropertiesMM props = FileFormatPropertiesMM.parse(firstLine); + // process 2nd line of MatrixMarket format -- must have size information String secondLine = headerLines[1]; @@ -1813,59 +1816,12 @@ public class DataExpression extends DataIdentifier return retVal; } - - public String[] readMatrixMarketFile(String filename, boolean conditional) - { - String[] retVal = new String[2]; - retVal[0] = new String(""); - retVal[1] = new String(""); - boolean exists = false; - - try - { - Path path = new Path(filename); - FileSystem fs = IOUtilFunctions.getFileSystem(path); - exists = fs.exists(path); - boolean getFileStatusIsDir = fs.getFileStatus(path).isDirectory(); - - if (exists && getFileStatusIsDir){ - raiseValidateError("MatrixMarket files as directories not supported", conditional); - } - else if (exists) { - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(path))); - try - { - retVal[0] = in.readLine(); - // skip all commented lines - do { - retVal[1] = in.readLine(); - } while ( retVal[1].charAt(0) == '%' ); - - if ( !retVal[0].startsWith("%%") ) { - raiseValidateError("MatrixMarket files must begin with a header line.", conditional); - } - } - finally { - IOUtilFunctions.closeSilently(in); - } - } - else { - raiseValidateError("Could not find the file: " + filename, conditional); - } - - } catch (IOException e){ - //throw new LanguageException(this.printErrorLocation() + "Error reading MatrixMarket file: " + filename ); - throw new LanguageException(e); - } - - return retVal; - } public boolean checkHasMatrixMarketFormat(String inputFileName, String mtdFileName, boolean conditional) { // Check the MTD file exists. if there is an MTD file, return false. JSONObject mtdObject = readMetadataFile(mtdFileName, conditional); - if (mtdObject != null) + if (mtdObject != null) return false; if( MapReduceTool.existsFileOnHDFS(inputFileName) @@ -1876,13 +1832,13 @@ public class DataExpression extends DataIdentifier Path path = new Path(inputFileName); FileSystem fs = IOUtilFunctions.getFileSystem(path); in = new BufferedReader(new InputStreamReader(fs.open(path))); - String headerLine = new String(""); + String headerLine = new String(""); if (in.ready()) headerLine = in.readLine(); return (headerLine !=null && headerLine.startsWith("%%")); } catch(Exception ex) { - throw new LanguageException("Failed to read mtd file.", ex); + throw new LanguageException("Failed to read matrix market header.", ex); } finally { IOUtilFunctions.closeSilently(in); http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index 6ce1cd3..f524251 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -45,12 +45,12 @@ import org.apache.sysml.runtime.instructions.gpu.context.GPUContext; import org.apache.sysml.runtime.instructions.gpu.context.GPUObject; import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; import org.apache.sysml.runtime.matrix.MetaDataNumItemsByEachReducer; import org.apache.sysml.runtime.matrix.MetaData; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.LocalFileUtils; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index 2e8801b..b953906 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -31,6 +31,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.io.FrameWriter; @@ -38,7 +39,6 @@ import org.apache.sysml.runtime.io.FrameWriterFactory; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; import org.apache.sysml.runtime.matrix.MetaData; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index ca704e0..d4a25a4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -34,10 +34,10 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; import org.apache.sysml.runtime.matrix.MetaData; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java index cb8a1b4..7bf941d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java @@ -44,14 +44,14 @@ import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.instructions.Instruction; import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.WriterMatrixMarket; import org.apache.sysml.runtime.io.WriterTextCSV; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; import org.apache.sysml.runtime.matrix.MetaData; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -365,7 +365,7 @@ public class VariableCPInstruction extends CPInstruction { boolean hasHeader = Boolean.parseBoolean(parts[12]); String delim = parts[13]; boolean sparse = Boolean.parseBoolean(parts[14]); - fmtProperties = new CSVFileFormatProperties(hasHeader, delim, sparse) ; + fmtProperties = new FileFormatPropertiesCSV(hasHeader, delim, sparse) ; } else { boolean hasHeader = Boolean.parseBoolean(parts[12]); @@ -375,7 +375,7 @@ public class VariableCPInstruction extends CPInstruction { String naStrings = null; if ( parts.length == 17+extSchema ) naStrings = parts[16]; - fmtProperties = new CSVFileFormatProperties(hasHeader, delim, fill, fillValue, naStrings) ; + fmtProperties = new FileFormatPropertiesCSV(hasHeader, delim, fill, fillValue, naStrings) ; } return new VariableCPInstruction(VariableOperationCode.CreateVariable, in1, in2, in3, iimd, updateType, fmtProperties, schema, opcode, str); } @@ -435,7 +435,7 @@ public class VariableCPInstruction extends CPInstruction { boolean hasHeader = Boolean.parseBoolean(parts[4]); String delim = parts[5]; boolean sparse = Boolean.parseBoolean(parts[6]); - fprops = new CSVFileFormatProperties(hasHeader, delim, sparse); + fprops = new FileFormatPropertiesCSV(hasHeader, delim, sparse); in4 = new CPOperand(parts[7]); // description } else { fprops = new FileFormatProperties(); @@ -869,7 +869,7 @@ public class VariableCPInstruction extends CPInstruction { OutputInfo oi = ((MetaDataFormat)mo.getMetaData()).getOutputInfo(); MatrixCharacteristics mc = ((MetaDataFormat)mo.getMetaData()).getMatrixCharacteristics(); if(oi == OutputInfo.CSVOutputInfo) { - WriterTextCSV writer = new WriterTextCSV((CSVFileFormatProperties)_formatProperties); + WriterTextCSV writer = new WriterTextCSV((FileFormatPropertiesCSV)_formatProperties); writer.addHeaderToCSV(mo.getFileName(), fname, mc.getRows(), mc.getCols()); } else if ( oi == OutputInfo.BinaryBlockOutputInfo || oi == OutputInfo.TextCellOutputInfo ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java index 87e9d29..bea9b03 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java @@ -36,9 +36,11 @@ import org.apache.sysml.runtime.instructions.spark.functions.ExtractBlockForBina import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysml.runtime.io.FileFormatPropertiesMM; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -116,13 +118,17 @@ public class ReblockSPInstruction extends UnarySPInstruction { MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); if(iinfo == InputInfo.TextCellInputInfo || iinfo == InputInfo.MatrixMarketInputInfo ) { + //get matrix market file properties if necessary + FileFormatPropertiesMM mmProps = (iinfo == InputInfo.MatrixMarketInputInfo) ? + IOUtilFunctions.readAndParseMatrixMarketHeader(mo.getFileName()) : null; + //get the input textcell rdd JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) sec.getRDDHandleForMatrixObject(mo, iinfo); //convert textcell to binary block - JavaPairRDD<MatrixIndexes, MatrixBlock> out = - RDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, outputEmptyBlocks); + JavaPairRDD<MatrixIndexes, MatrixBlock> out = RDDConverterUtils.textCellToBinaryBlock( + sec.getSparkContext(), lines, mcOut, outputEmptyBlocks, mmProps); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); @@ -136,10 +142,10 @@ public class ReblockSPInstruction extends UnarySPInstruction { String delim = ","; boolean fill = false; double fillValue = 0; - if(mo.getFileFormatProperties() instanceof CSVFileFormatProperties + if(mo.getFileFormatProperties() instanceof FileFormatPropertiesCSV && mo.getFileFormatProperties() != null ) { - CSVFileFormatProperties props = (CSVFileFormatProperties) mo.getFileFormatProperties(); + FileFormatPropertiesCSV props = (FileFormatPropertiesCSV) mo.getFileFormatProperties(); hasHeader = props.hasHeader(); delim = props.getDelim(); fill = props.isFill(); @@ -210,10 +216,10 @@ public class ReblockSPInstruction extends UnarySPInstruction { String delim = ","; boolean fill = false; double fillValue = 0; - if(fo.getFileFormatProperties() instanceof CSVFileFormatProperties + if(fo.getFileFormatProperties() instanceof FileFormatPropertiesCSV && fo.getFileFormatProperties() != null ) { - CSVFileFormatProperties props = (CSVFileFormatProperties) fo.getFileFormatProperties(); + FileFormatPropertiesCSV props = (FileFormatPropertiesCSV) fo.getFileFormatProperties(); hasHeader = props.hasHeader(); delim = props.getDelim(); fill = props.isFill(); http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index 675ea5a..20ec7fb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -38,10 +38,10 @@ import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -89,7 +89,7 @@ public class WriteSPInstruction extends SPInstruction { boolean hasHeader = Boolean.parseBoolean(parts[4]); String delim = parts[5]; boolean sparse = Boolean.parseBoolean(parts[6]); - FileFormatProperties formatProperties = new CSVFileFormatProperties(hasHeader, delim, sparse); + FileFormatProperties formatProperties = new FileFormatPropertiesCSV(hasHeader, delim, sparse); inst.setFormatProperties(formatProperties); CPOperand in4 = new CPOperand(parts[8]); inst.input4 = in4; @@ -198,7 +198,7 @@ public class WriteSPInstruction extends SPInstruction { } JavaRDD<String> out = RDDConverterUtils.binaryBlockToCsv( - in1, mc, (CSVFileFormatProperties) formatProperties, true); + in1, mc, (FileFormatPropertiesCSV) formatProperties, true); customSaveTextFile(out, fname, false); @@ -241,7 +241,7 @@ public class WriteSPInstruction extends SPInstruction { customSaveTextFile(out, fname, false); } else if( oi == OutputInfo.CSVOutputInfo ) { - CSVFileFormatProperties props = (formatProperties!=null) ?(CSVFileFormatProperties) formatProperties : null; + FileFormatPropertiesCSV props = (formatProperties!=null) ?(FileFormatPropertiesCSV) formatProperties : null; JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToCsv(in1, mc, props, true); customSaveTextFile(out, fname, false); } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 05d7791..c0fc34a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -55,9 +55,9 @@ import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable; import org.apache.sysml.runtime.instructions.spark.data.SerText; import org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -122,7 +122,7 @@ public class FrameRDDConverterUtils } public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, - MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) + MatrixCharacteristics mcIn, FileFormatPropertiesCSV props, boolean strict) { JavaPairRDD<Long,FrameBlock> input = in; @@ -637,9 +637,9 @@ public class FrameRDDConverterUtils { private static final long serialVersionUID = 8020608184930291069L; - private CSVFileFormatProperties _props = null; + private FileFormatPropertiesCSV _props = null; - public BinaryBlockToCSVFunction(CSVFileFormatProperties props) { + public BinaryBlockToCSVFunction(FileFormatPropertiesCSV props) { _props = props; } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index 90d94d0..94dfb55 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -58,9 +58,10 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable; import org.apache.sysml.runtime.instructions.spark.data.SerText; import org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysml.runtime.io.FileFormatPropertiesMM; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -78,10 +79,10 @@ public class RDDConverterUtils public static final String DF_ID_COLUMN = "__INDEX"; public static JavaPairRDD<MatrixIndexes, MatrixBlock> textCellToBinaryBlock(JavaSparkContext sc, - JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut, boolean outputEmptyBlocks) { - //convert textcell rdd to binary block rdd (w/ partial blocks) + JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut, boolean outputEmptyBlocks, FileFormatPropertiesMM mmProps) { + //convert textcell rdd to binary block rdd (w/ partial blocks) JavaPairRDD<MatrixIndexes, MatrixBlock> out = input.values() - .mapPartitionsToPair(new TextToBinaryBlockFunction(mcOut)); + .mapPartitionsToPair(new TextToBinaryBlockFunction(mcOut, mmProps)); //inject empty blocks (if necessary) if( outputEmptyBlocks && mcOut.mightHaveEmptyBlocks() ) { @@ -135,7 +136,7 @@ public class RDDConverterUtils mc.getRowsPerBlock(), mc.getColsPerBlock())); } - public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<MatrixIndexes,MatrixBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) + public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<MatrixIndexes,MatrixBlock> in, MatrixCharacteristics mcIn, FileFormatPropertiesCSV props, boolean strict) { JavaPairRDD<MatrixIndexes,MatrixBlock> input = in; @@ -499,8 +500,12 @@ public class RDDConverterUtils { private static final long serialVersionUID = 4907483236186747224L; - protected TextToBinaryBlockFunction(MatrixCharacteristics mc) { + @SuppressWarnings("unused") + private final FileFormatPropertiesMM _mmProps; + + protected TextToBinaryBlockFunction(MatrixCharacteristics mc, FileFormatPropertiesMM mmProps) { super(mc); + _mmProps = mmProps; } @Override @@ -897,9 +902,9 @@ public class RDDConverterUtils { private static final long serialVersionUID = 1891768410987528573L; - private CSVFileFormatProperties _props = null; + private FileFormatPropertiesCSV _props = null; - public BinaryBlockToCSVFunction(CSVFileFormatProperties props) { + public BinaryBlockToCSVFunction(FileFormatPropertiesCSV props) { _props = props; } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FileFormatProperties.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FileFormatProperties.java b/src/main/java/org/apache/sysml/runtime/io/FileFormatProperties.java new file mode 100644 index 0000000..bcf5a54 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FileFormatProperties.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +public class FileFormatProperties +{ + private String description; + + public FileFormatProperties() { + + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesCSV.java b/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesCSV.java new file mode 100644 index 0000000..5988a10 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesCSV.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.Serializable; + +import org.apache.sysml.parser.DataExpression; + +public class FileFormatPropertiesCSV extends FileFormatProperties implements Serializable +{ + private static final long serialVersionUID = -2870393360885401604L; + + private boolean header; + private String delim; + private boolean fill; + private double fillValue; + private String naStrings; + + private boolean sparse; + + public FileFormatPropertiesCSV() { + // get the default values for CSV properties from the language layer + this.header = DataExpression.DEFAULT_DELIM_HAS_HEADER_ROW; + this.delim = DataExpression.DEFAULT_DELIM_DELIMITER; + this.fill = DataExpression.DEFAULT_DELIM_FILL; + this.fillValue = DataExpression.DEFAULT_DELIM_FILL_VALUE; + this.sparse = DataExpression.DEFAULT_DELIM_SPARSE; + this.naStrings = null; + } + + public FileFormatPropertiesCSV(boolean hasHeader, String delim, boolean fill, double fillValue, String naStrings) { + this.header = hasHeader; + this.delim = delim; + this.fill = fill; + this.fillValue = fillValue; + this.naStrings = naStrings; + } + + public FileFormatPropertiesCSV(boolean hasHeader, String delim, boolean sparse) { + this.header = hasHeader; + this.delim = delim; + this.sparse = sparse; + } + + public boolean hasHeader() { + return header; + } + + public void setHeader(boolean hasHeader) { + this.header = hasHeader; + } + + public String getDelim() { + return delim; + } + + public String getNAStrings() { + return naStrings; + } + + public void setDelim(String delim) { + this.delim = delim; + } + + public boolean isFill() { + return fill; + } + + public void setFill(boolean fill) { + this.fill = fill; + } + + public double getFillValue() { + return fillValue; + } + + public void setFillValue(double fillValue) { + this.fillValue = fillValue; + } + + public boolean isSparse() { + return sparse; + } + + public void setSparse(boolean sparse) { + this.sparse = sparse; + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesMM.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesMM.java b/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesMM.java new file mode 100644 index 0000000..0cdbe7c --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FileFormatPropertiesMM.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.Serializable; +import java.util.StringTokenizer; + +import org.apache.sysml.runtime.DMLRuntimeException; + +public class FileFormatPropertiesMM extends FileFormatProperties implements Serializable +{ + private static final long serialVersionUID = -2870393360885401604L; + + public enum MMFormat { + COORDINATE, + ARRAY; + public String toString() { + return this.name().toLowerCase(); + } + } + + public enum MMField { + REAL, + INTEGER, + COMPLEX, + PATTERN; + public String toString() { + return this.name().toLowerCase(); + } + } + + public enum MMSymmetry { + GENERAL, + SYMMETRIC, + SKEW_SYMMETRIC; + public String toString() { + return this.name().toLowerCase().replaceAll("_", "-"); + } + } + + private final MMFormat _fmt; + private final MMField _field; + private final MMSymmetry _symmetry; + + public FileFormatPropertiesMM() { + // get the default values for MM properties + this(MMFormat.COORDINATE, MMField.REAL, MMSymmetry.GENERAL); + } + + public FileFormatPropertiesMM(MMFormat fmt, MMField field, MMSymmetry symmetry) { + _fmt = fmt; + _field = field; + _symmetry = symmetry; + + //check valid combination + if( _field == MMField.PATTERN && (_fmt == MMFormat.ARRAY || _symmetry == MMSymmetry.SKEW_SYMMETRIC) ) { + throw new DMLRuntimeException("MatrixMarket: Invalid combination: " + + _fmt.toString() + " " + _field.toString() + " " + _symmetry.toString() +"."); + } + } + + public MMFormat getFormat() { + return _fmt; + } + + public MMField getField() { + return _field; + } + + public MMSymmetry getSymmetry() { + return _symmetry; + } + + public static FileFormatPropertiesMM parse(String header) { + //example: %%MatrixMarket matrix coordinate real general + //(note: we use a string tokenizer because the individual + //components can be separated by an arbitrary number of spaces) + + StringTokenizer st = new StringTokenizer(header, " "); + + //check basic structure and matrix object + int numTokens = st.countTokens(); + if( numTokens != 5 ) + throw new DMLRuntimeException("MatrixMarket: Incorrect number of header tokens: "+numTokens+" (expeced: 5)."); + String type = st.nextToken(); + if( !type.equals("%%MatrixMarket") ) + throw new DMLRuntimeException("MatrixMarket: Incorrect header start: "+type+" (expected: %%MatrixMarket)."); + String object = st.nextToken(); + if( !object.equals("matrix") ) + throw new DMLRuntimeException("MatrixMarket: Incorrect object: "+object+" (expected: matrix)."); + + //check format, field, and + String format = st.nextToken(); + MMFormat fmt = null; + switch( format ) { + //case "array": fmt = MMFormat.ARRAY; break; + case "coordinate": fmt = MMFormat.COORDINATE; break; + default: throw new DMLRuntimeException("MatrixMarket: " + + "Incorrect format: "+format+" (expected array | coordinate)."); + } + String field = st.nextToken(); + MMField f = null; + switch( field ) { + case "real": f = MMField.REAL; break; + case "integer": f = MMField.INTEGER; break; + case "pattern": f = MMField.PATTERN; break; + //note: complex not supported + default: throw new DMLRuntimeException("MatrixMarket: " + + "Incorrect field: "+field+" (expected real | integer | pattern)."); + } + String symmetry = st.nextToken(); + MMSymmetry s = null; + switch( symmetry ) { + case "general": s = MMSymmetry.GENERAL; break; + //case "symmetric": s = MMSymmetry.SYMMETRIC; break; + //case "skew-symmetric": s = MMSymmetry.SKEW_SYMMETRIC; break; + //note: Hermitian not supported + default: throw new DMLRuntimeException("MatrixMarket: " + + "Incorrect symmetry: "+symmetry+" (expected general | symmetric | skew-symmetric)."); + } + + //construct file properties and check valid combination + return new FileFormatPropertiesMM(fmt, f, s); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java index 0f9d634..d8c89e5 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java @@ -22,8 +22,6 @@ package org.apache.sysml.runtime.io; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.CompilerConfig.ConfigType; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.InputInfo; public class FrameReaderFactory @@ -31,7 +29,7 @@ public class FrameReaderFactory public static FrameReader createFrameReader( InputInfo iinfo ) { FileFormatProperties props = (iinfo==InputInfo.CSVInputInfo) ? - new CSVFileFormatProperties() : null; + new FileFormatPropertiesCSV() : null; return createFrameReader(iinfo, props); } @@ -45,12 +43,12 @@ public class FrameReaderFactory reader = new FrameReaderTextCell(); } else if( iinfo == InputInfo.CSVInputInfo ) { - if( props!=null && !(props instanceof CSVFileFormatProperties) ) + if( props!=null && !(props instanceof FileFormatPropertiesCSV) ) throw new DMLRuntimeException("Wrong type of file format properties for CSV writer."); if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) ) - reader = new FrameReaderTextCSVParallel( (CSVFileFormatProperties)props ); + reader = new FrameReaderTextCSVParallel( (FileFormatPropertiesCSV)props ); else - reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props ); + reader = new FrameReaderTextCSV( (FileFormatPropertiesCSV)props ); } else if( iinfo == InputInfo.BinaryBlockInputInfo ) { if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index 275d647..6bb7de1 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.TfUtils; @@ -49,9 +48,9 @@ import org.apache.sysml.runtime.util.UtilFunctions; */ public class FrameReaderTextCSV extends FrameReader { - protected CSVFileFormatProperties _props = null; + protected FileFormatPropertiesCSV _props = null; - public FrameReaderTextCSV(CSVFileFormatProperties props) { + public FrameReaderTextCSV(FileFormatPropertiesCSV props) { _props = props; } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java index a94d5ef..03712a9 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java @@ -37,7 +37,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.parser.Expression.ValueType; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.TfUtils; @@ -49,7 +48,7 @@ import org.apache.sysml.runtime.util.CommonThreadPool; */ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV { - public FrameReaderTextCSVParallel(CSVFileFormatProperties props) { + public FrameReaderTextCSVParallel(FileFormatPropertiesCSV props) { super(props); } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java index df1ed89..6b2fd2d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java @@ -22,8 +22,6 @@ package org.apache.sysml.runtime.io; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.CompilerConfig.ConfigType; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.OutputInfo; public class FrameWriterFactory @@ -43,12 +41,12 @@ public class FrameWriterFactory writer = new FrameWriterTextCell(); } else if( oinfo == OutputInfo.CSVOutputInfo ) { - if( props!=null && !(props instanceof CSVFileFormatProperties) ) + if( props!=null && !(props instanceof FileFormatPropertiesCSV) ) throw new DMLRuntimeException("Wrong type of file format properties for CSV writer."); if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) ) - writer = new FrameWriterTextCSVParallel((CSVFileFormatProperties)props); + writer = new FrameWriterTextCSVParallel((FileFormatPropertiesCSV)props); else - writer = new FrameWriterTextCSV((CSVFileFormatProperties)props); + writer = new FrameWriterTextCSV((FileFormatPropertiesCSV)props); } else if( oinfo == OutputInfo.BinaryBlockOutputInfo ) { if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS) ) http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java index 97dab12..e9bb9be 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.MapReduceTool; @@ -44,9 +43,9 @@ public class FrameWriterTextCSV extends FrameWriter //(can be set to very large value to disable blocking) public static final int BLOCKSIZE_J = 32; //32 cells (typically ~512B, should be less than write buffer of 1KB) - protected CSVFileFormatProperties _props = null; + protected FileFormatPropertiesCSV _props = null; - public FrameWriterTextCSV( CSVFileFormatProperties props ) { + public FrameWriterTextCSV( FileFormatPropertiesCSV props ) { _props = props; } @@ -71,7 +70,7 @@ public class FrameWriterTextCSV extends FrameWriter writeCSVFrameToHDFS(path, job, src, rlen, clen, _props); } - protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties csvprops ) + protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, FileFormatPropertiesCSV csvprops ) throws IOException { FileSystem fs = IOUtilFunctions.getFileSystem(path, job); @@ -81,7 +80,7 @@ public class FrameWriterTextCSV extends FrameWriter IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } - protected static void writeCSVFrameToFile( Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties props ) + protected static void writeCSVFrameToFile( Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, FileFormatPropertiesCSV props ) throws IOException { //create buffered writer @@ -93,7 +92,7 @@ public class FrameWriterTextCSV extends FrameWriter //for obj reuse and preventing repeated buffer re-allocations StringBuilder sb = new StringBuilder(); - props = (props==null)? new CSVFileFormatProperties() : props; + props = (props==null)? new FileFormatPropertiesCSV() : props; String delim = props.getDelim(); // Write header line, if needed http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java index 460e41c..fd16f7d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java @@ -33,7 +33,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.CommonThreadPool; @@ -45,12 +44,12 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV { - public FrameWriterTextCSVParallel( CSVFileFormatProperties props ) { + public FrameWriterTextCSVParallel( FileFormatPropertiesCSV props ) { super(props); } @Override - protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties csvprops ) + protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, FileFormatPropertiesCSV csvprops ) throws IOException { //estimate output size and number of output blocks (min 1) @@ -110,9 +109,9 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV private FrameBlock _src = null; private int _rl = -1; private int _ru = -1; - private CSVFileFormatProperties _csvprops = null; + private FileFormatPropertiesCSV _csvprops = null; - public WriteFileTask(Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties csvprops) { + public WriteFileTask(Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, FileFormatPropertiesCSV csvprops) { _path = path; _fs = fs; _job = job; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index b484dfc..e158f61 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -19,10 +19,12 @@ package org.apache.sysml.runtime.io; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; @@ -47,6 +49,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -111,9 +114,9 @@ public class IOUtilFunctions try { if( io != null ) io.close(); - } + } catch (Exception ex) { - LOG.error("Failed to close IO resource.", ex); + LOG.error("Failed to close IO resource.", ex); } } @@ -122,9 +125,9 @@ public class IOUtilFunctions try { if( rr != null ) rr.close(); - } + } catch (Exception ex) { - LOG.error("Failed to close record reader.", ex); + LOG.error("Failed to close record reader.", ex); } } @@ -330,6 +333,47 @@ public class IOUtilFunctions return numTokens; } + public static FileFormatPropertiesMM readAndParseMatrixMarketHeader(String filename) throws DMLRuntimeException { + String[] header = readMatrixMarketHeader(filename); + return FileFormatPropertiesMM.parse(header[0]); + } + + public static String[] readMatrixMarketHeader(String filename) throws DMLRuntimeException { + String[] retVal = new String[2]; + retVal[0] = new String(""); + retVal[1] = new String(""); + boolean exists = false; + + try { + Path path = new Path(filename); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + exists = fs.exists(path); + boolean getFileStatusIsDir = fs.getFileStatus(path).isDirectory(); + if (exists && getFileStatusIsDir) { + throw new DMLRuntimeException("MatrixMarket files as directories not supported"); + } + else if (exists) { + try( BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(path))) ) { + retVal[0] = in.readLine(); + // skip all commented lines + do { + retVal[1] = in.readLine(); + } while ( retVal[1].charAt(0) == '%' ); + if ( !retVal[0].startsWith("%%") ) { + throw new DMLRuntimeException("MatrixMarket files must begin with a header line."); + } + } + } + else { + throw new DMLRuntimeException("Could not find the file: " + filename); + } + } + catch (IOException e){ + throw new DMLRuntimeException(e); + } + return retVal; + } + /** * Returns the number of non-zero entries but avoids the expensive * string to double parsing. This function is guaranteed to never http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/MatrixReaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReaderFactory.java index 68a1d2b..a2b2922 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReaderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReaderFactory.java @@ -22,7 +22,6 @@ package org.apache.sysml.runtime.io; import org.apache.sysml.conf.CompilerConfig.ConfigType; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; @@ -44,9 +43,9 @@ public class MatrixReaderFactory else if( iinfo == InputInfo.CSVInputInfo ) { if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) && MatrixBlock.DEFAULT_SPARSEBLOCK == SparseBlock.Type.MCSR ) - reader = new ReaderTextCSVParallel(new CSVFileFormatProperties()); + reader = new ReaderTextCSVParallel(new FileFormatPropertiesCSV()); else - reader = new ReaderTextCSV(new CSVFileFormatProperties()); + reader = new ReaderTextCSV(new FileFormatPropertiesCSV()); } else if( iinfo == InputInfo.BinaryCellInputInfo ) reader = new ReaderBinaryCell(); @@ -81,9 +80,9 @@ public class MatrixReaderFactory } else if( iinfo == InputInfo.CSVInputInfo ) { if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) && MatrixBlock.DEFAULT_SPARSEBLOCK == SparseBlock.Type.MCSR ) - reader = new ReaderTextCSVParallel( props.formatProperties!=null ? (CSVFileFormatProperties)props.formatProperties : new CSVFileFormatProperties()); + reader = new ReaderTextCSVParallel( props.formatProperties!=null ? (FileFormatPropertiesCSV)props.formatProperties : new FileFormatPropertiesCSV()); else - reader = new ReaderTextCSV( props.formatProperties!=null ? (CSVFileFormatProperties)props.formatProperties : new CSVFileFormatProperties()); + reader = new ReaderTextCSV( props.formatProperties!=null ? (FileFormatPropertiesCSV)props.formatProperties : new FileFormatPropertiesCSV()); } else if( iinfo == InputInfo.BinaryCellInputInfo ) reader = new ReaderBinaryCell(); http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/MatrixWriterFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixWriterFactory.java b/src/main/java/org/apache/sysml/runtime/io/MatrixWriterFactory.java index f5a93bb..317109c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixWriterFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixWriterFactory.java @@ -22,8 +22,6 @@ package org.apache.sysml.runtime.io; import org.apache.sysml.conf.CompilerConfig.ConfigType; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.OutputInfo; public class MatrixWriterFactory @@ -53,12 +51,12 @@ public class MatrixWriterFactory writer = new WriterMatrixMarket(); } else if( oinfo == OutputInfo.CSVOutputInfo ) { - if( props!=null && !(props instanceof CSVFileFormatProperties) ) + if( props!=null && !(props instanceof FileFormatPropertiesCSV) ) throw new DMLRuntimeException("Wrong type of file format properties for CSV writer."); if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) ) - writer = new WriterTextCSVParallel((CSVFileFormatProperties)props); + writer = new WriterTextCSVParallel((FileFormatPropertiesCSV)props); else - writer = new WriterTextCSV((CSVFileFormatProperties)props); + writer = new WriterTextCSV((FileFormatPropertiesCSV)props); } else if( oinfo == OutputInfo.BinaryCellOutputInfo ) { writer = new WriterBinaryCell(); http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/ReadProperties.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReadProperties.java b/src/main/java/org/apache/sysml/runtime/io/ReadProperties.java index 9ce11d9..b3c3f40 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReadProperties.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReadProperties.java @@ -19,7 +19,6 @@ package org.apache.sysml.runtime.io; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.InputInfo; public class ReadProperties http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java index e8a8e1a..141b2da 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java @@ -37,16 +37,15 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.UtilFunctions; public class ReaderTextCSV extends MatrixReader { - private CSVFileFormatProperties _props = null; + private FileFormatPropertiesCSV _props = null; - public ReaderTextCSV(CSVFileFormatProperties props) { + public ReaderTextCSV(FileFormatPropertiesCSV props) { _props = props; } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 53cf1d4..871d1ec 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.CommonThreadPool; @@ -58,12 +57,12 @@ import org.apache.sysml.runtime.util.CommonThreadPool; */ public class ReaderTextCSVParallel extends MatrixReader { - private CSVFileFormatProperties _props = null; + private FileFormatPropertiesCSV _props = null; private int _numThreads = 1; private SplitOffsetInfos _offsets = null; - public ReaderTextCSVParallel(CSVFileFormatProperties props) { + public ReaderTextCSVParallel(FileFormatPropertiesCSV props) { _numThreads = OptimizerUtils.getParallelTextReadParallelism(); _props = props; } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java index 1fa8940..321aec4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java @@ -45,10 +45,16 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class ReaderTextCell extends MatrixReader { - private boolean _isMMFile = false; + protected final boolean _allowRawRead; + protected final boolean _isMMFile; + protected FileFormatProperties _mmProps = null; - public ReaderTextCell(InputInfo info) - { + public ReaderTextCell(InputInfo info) { + this(info, true); + } + + public ReaderTextCell(InputInfo info, boolean allowRaw) { + _allowRawRead = allowRaw; _isMMFile = (info == InputInfo.MatrixMarketInputInfo); } @@ -61,22 +67,26 @@ public class ReaderTextCell extends MatrixReader Path path = new Path( fname ); FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //check existence and non-empty file + checkValidInputFile(fs, path); + + //read matrix market header + if( _isMMFile ) + _mmProps = IOUtilFunctions.readAndParseMatrixMarketHeader(fname); + //allocate output matrix block if( estnnz < 0 ) estnnz = MapReduceTool.estimateNnzBasedOnFileSize(path, rlen, clen, brlen, bclen, 3); MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); - //check existence and non-empty file - checkValidInputFile(fs, path); - //core read - if( fs.isDirectory(path) ) + if( fs.isDirectory(path) || !_allowRawRead ) readTextCellMatrixFromHDFS(path, job, ret, rlen, clen, brlen, bclen); else readRawTextCellMatrixFromHDFS(path, job, fs, ret, rlen, clen, brlen, bclen, _isMMFile); //finally check if change of sparse/dense block representation required - if( !ret.isInSparseFormat() ) + if( !AGGREGATE_BLOCK_NNZ ) ret.recomputeNonZeros(); ret.examSparsity(); @@ -94,14 +104,14 @@ public class ReaderTextCell extends MatrixReader readRawTextCellMatrixFromInputStream(is, ret, rlen, clen, brlen, bclen, _isMMFile); //finally check if change of sparse/dense block representation required - if( !ret.isInSparseFormat() ) + if( !AGGREGATE_BLOCK_NNZ ) ret.recomputeNonZeros(); ret.examSparsity(); return ret; } - private static void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen ) + protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen ) throws IOException { boolean sparse = dest.isInSparseFormat(); @@ -112,19 +122,16 @@ public class ReaderTextCell extends MatrixReader LongWritable key = new LongWritable(); Text value = new Text(); - int row = -1; - int col = -1; + int row = -1, col = -1; + long nnz = 0; try { FastStringTokenizer st = new FastStringTokenizer(' '); - for(InputSplit split: splits) - { + for(InputSplit split: splits) { RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL); - - try - { + try { if( sparse ) //SPARSE<-value { while( reader.next(key, value) ) { @@ -148,6 +155,7 @@ public class ReaderTextCell extends MatrixReader if(row == -1 || col == -1) continue; double lvalue = st.nextDouble(); a.set( row, col, lvalue ); + nnz += (lvalue != 0) ? 1 : 0; } } } @@ -155,6 +163,9 @@ public class ReaderTextCell extends MatrixReader IOUtilFunctions.closeSilently(reader); } } + + if( !dest.isInSparseFormat() ) + dest.setNonZeros(nnz); } catch(Exception ex) { //post-mortem error handling and bounds checking @@ -180,11 +191,13 @@ public class ReaderTextCell extends MatrixReader throws IOException { BufferedReader br = new BufferedReader(new InputStreamReader( is )); + @SuppressWarnings("unused") + FileFormatPropertiesMM mmProps = null; boolean sparse = dest.isInSparseFormat(); String value = null; - int row = -1; - int col = -1; + int row = -1, col = -1; + long nnz = 0; // Read the header lines, if reading from a matrixMarket file if ( matrixMarket ) { @@ -192,6 +205,7 @@ public class ReaderTextCell extends MatrixReader if ( value==null || !value.startsWith("%%") ) { throw new IOException("Error while reading file in MatrixMarket format. Expecting a header line, but encountered, \"" + value +"\"."); } + mmProps = FileFormatPropertiesMM.parse(value); // skip until end-of-comments while( (value = br.readLine())!=null && value.charAt(0) == '%' ) { @@ -209,7 +223,7 @@ public class ReaderTextCell extends MatrixReader } try - { + { FastStringTokenizer st = new FastStringTokenizer(' '); if( sparse ) //SPARSE<-value @@ -236,7 +250,9 @@ public class ReaderTextCell extends MatrixReader if(row == -1 || col == -1) continue; double lvalue = st.nextDouble(); a.set( row, col, lvalue ); + nnz += (lvalue != 0) ? 1 : 0; } + dest.setNonZeros(nnz); } } catch(Exception ex) { http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index 72e0cde..3a67e56 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -20,14 +20,12 @@ package org.apache.sysml.runtime.io; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -37,9 +35,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -65,57 +61,19 @@ import org.apache.sysml.runtime.util.MapReduceTool; * in order the issue described in (2). * */ -public class ReaderTextCellParallel extends MatrixReader +public class ReaderTextCellParallel extends ReaderTextCell { private static final long MIN_FILESIZE_MM = 8L * 1024; //8KB - private boolean _isMMFile = false; private int _numThreads = 1; - public ReaderTextCellParallel(InputInfo info) - { - _isMMFile = (info == InputInfo.MatrixMarketInputInfo); + public ReaderTextCellParallel(InputInfo info) { + super(info, false); _numThreads = OptimizerUtils.getParallelTextReadParallelism(); } - - @Override - public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz) - throws IOException, DMLRuntimeException - { - //prepare file access - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path( fname ); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - - //check existence and non-empty file - checkValidInputFile(fs, path); - - //allocate output matrix block - if( estnnz < 0 ) - estnnz = MapReduceTool.estimateNnzBasedOnFileSize(path, rlen, clen, brlen, bclen, 3); - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); - - //core read - readTextCellMatrixFromHDFS(path, job, ret, rlen, clen, brlen, bclen, _isMMFile); - - //finally check if change of sparse/dense block representation required - if( !AGGREGATE_BLOCK_NNZ ) - ret.recomputeNonZeros(); - ret.examSparsity(); - - return ret; - } @Override - public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) - throws IOException, DMLRuntimeException - { - //not implemented yet, fallback to sequential reader - return new ReaderTextCell(_isMMFile ? InputInfo.MatrixMarketInputInfo : InputInfo.TextCellInputInfo) - .readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz); - } - - private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket ) + protected void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen ) throws IOException { int par = _numThreads; @@ -128,7 +86,7 @@ public class ReaderTextCellParallel extends MatrixReader if( _isMMFile ){ long len = MapReduceTool.getFilesizeOnHDFS(path); par = ( len < MIN_FILESIZE_MM ) ? 1: par; - } + } try { @@ -137,12 +95,12 @@ public class ReaderTextCellParallel extends MatrixReader InputSplit[] splits = informat.getSplits(job, par); ArrayList<ReadTask> tasks = new ArrayList<>(); for( InputSplit split : splits ){ - ReadTask t = new ReadTask(split, informat, job, dest, rlen, clen, matrixMarket); + ReadTask t = new ReadTask(split, informat, job, dest, rlen, clen, _isMMFile); tasks.add(t); } //wait until all tasks have been executed - List<Future<Long>> rt = pool.invokeAll(tasks); + List<Future<Long>> rt = pool.invokeAll(tasks); //check for exceptions and aggregate nnz long lnnz = 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java index 3e89a52..90b69a6 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java @@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.util.MapReduceTool; @@ -47,9 +46,9 @@ public class WriterTextCSV extends MatrixWriter //(can be set to very large value to disable blocking) public static final int BLOCKSIZE_J = 32; //32 cells (typically ~512B, should be less than write buffer of 1KB) - protected CSVFileFormatProperties _props = null; + protected FileFormatPropertiesCSV _props = null; - public WriterTextCSV( CSVFileFormatProperties props ) { + public WriterTextCSV( FileFormatPropertiesCSV props ) { _props = props; } @@ -91,14 +90,14 @@ public class WriterTextCSV extends MatrixWriter IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } - protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops) + protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, FileFormatPropertiesCSV csvprops) throws IOException { //sequential write csv file writeCSVMatrixToFile(path, job, fs, src, 0, (int)src.getNumRows(), csvprops); } - protected static void writeCSVMatrixToFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props ) + protected static void writeCSVMatrixToFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, FileFormatPropertiesCSV props ) throws IOException { boolean sparse = src.isInSparseFormat(); @@ -112,7 +111,7 @@ public class WriterTextCSV extends MatrixWriter //for obj reuse and preventing repeated buffer re-allocations StringBuilder sb = new StringBuilder(); - props = (props==null)? new CSVFileFormatProperties() : props; + props = (props==null)? new FileFormatPropertiesCSV() : props; String delim = props.getDelim(); boolean csvsparse = props.isSparse(); http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java index 210b349..7aa7c37 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java @@ -33,7 +33,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.CommonThreadPool; @@ -41,12 +40,12 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class WriterTextCSVParallel extends WriterTextCSV { - public WriterTextCSVParallel( CSVFileFormatProperties props ) { + public WriterTextCSVParallel( FileFormatPropertiesCSV props ) { super( props ); } @Override - protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops) + protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, FileFormatPropertiesCSV csvprops) throws IOException { //estimate output size and number of output blocks (min 1) @@ -106,9 +105,9 @@ public class WriterTextCSVParallel extends WriterTextCSV private final MatrixBlock _src; private final Path _path; private final int _rl, _ru; - private final CSVFileFormatProperties _props; + private final FileFormatPropertiesCSV _props; - public WriteCSVTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props) { + public WriteCSVTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, FileFormatPropertiesCSV props) { _path = path; _job = job; _fs = fs; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/matrix/data/CSVFileFormatProperties.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/CSVFileFormatProperties.java b/src/main/java/org/apache/sysml/runtime/matrix/data/CSVFileFormatProperties.java deleted file mode 100644 index 5458cd0..0000000 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/CSVFileFormatProperties.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.matrix.data; - -import java.io.Serializable; - -import org.apache.sysml.parser.DataExpression; - -public class CSVFileFormatProperties extends FileFormatProperties implements Serializable -{ - private static final long serialVersionUID = -2870393360885401604L; - - private boolean header; - private String delim; - private boolean fill; - private double fillValue; - private String naStrings; - - private boolean sparse; - - public CSVFileFormatProperties() { - super(FileFormat.CSV); - - // get the default values for CSV properties from the language layer - this.header = DataExpression.DEFAULT_DELIM_HAS_HEADER_ROW; - this.delim = DataExpression.DEFAULT_DELIM_DELIMITER; - this.fill = DataExpression.DEFAULT_DELIM_FILL; - this.fillValue = DataExpression.DEFAULT_DELIM_FILL_VALUE; - this.sparse = DataExpression.DEFAULT_DELIM_SPARSE; - this.naStrings = null; - } - - public CSVFileFormatProperties(boolean hasHeader, String delim, boolean fill, double fillValue, String naStrings) { - super(FileFormat.CSV); - - this.header = hasHeader; - this.delim = delim; - this.fill = fill; - this.fillValue = fillValue; - this.naStrings = naStrings; - } - - public CSVFileFormatProperties(boolean hasHeader, String delim, boolean sparse) { - super(FileFormat.CSV); - - this.header = hasHeader; - this.delim = delim; - this.sparse = sparse; - } - - public boolean hasHeader() { - return header; - } - - public void setHeader(boolean hasHeader) { - this.header = hasHeader; - } - - public String getDelim() { - return delim; - } - - public String getNAStrings() { - return naStrings; - } - - public void setDelim(String delim) { - this.delim = delim; - } - - public boolean isFill() { - return fill; - } - - public void setFill(boolean fill) { - this.fill = fill; - } - - public double getFillValue() { - return fillValue; - } - - public void setFillValue(double fillValue) { - this.fillValue = fillValue; - } - - public boolean isSparse() { - return sparse; - } - - public void setSparse(boolean sparse) { - this.sparse = sparse; - } - -} http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/matrix/data/FileFormatProperties.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FileFormatProperties.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FileFormatProperties.java deleted file mode 100644 index c03aa99..0000000 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FileFormatProperties.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.matrix.data; - -public class FileFormatProperties -{ - private String description; - - public enum FileFormat { CSV, NATIVE } - - FileFormat fmt; - - public FileFormatProperties() { - fmt = FileFormat.NATIVE; - } - - public FileFormatProperties(FileFormat fmt) { - this.fmt = fmt; - } - - public void setFileFormat(FileFormat fmt) { - this.fmt = fmt; - } - - public FileFormat getFileFormat() { - return fmt; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - -} http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index 7aa50d9..f4d3fb5 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -32,6 +32,7 @@ import org.apache.commons.math3.linear.Array2DRowRealMatrix; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.cp.BooleanObject; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.io.MatrixReaderFactory; import org.apache.sysml.runtime.io.MatrixWriter; @@ -41,7 +42,6 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CTableMap; import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.DenseBlockFactory; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.InputInfo; http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java index 239c5e1..cecd0e3 100644 --- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java +++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java @@ -48,13 +48,13 @@ import org.apache.sysml.parser.DataExpression; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; +import org.apache.sysml.runtime.io.FileFormatProperties; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.io.MatrixReaderFactory; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataNumItemsByEachReducer; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; @@ -484,8 +484,8 @@ public class MapReduceTool //handle format type and additional arguments mtd.put(DataExpression.FORMAT_TYPE, OutputInfo.outputInfoToStringExternal(outinfo)); if (outinfo == OutputInfo.CSVOutputInfo) { - CSVFileFormatProperties csvProperties = (formatProperties==null) ? - new CSVFileFormatProperties() : (CSVFileFormatProperties)formatProperties; + FileFormatPropertiesCSV csvProperties = (formatProperties==null) ? + new FileFormatPropertiesCSV() : (FileFormatPropertiesCSV)formatProperties; mtd.put(DataExpression.DELIM_HAS_HEADER_ROW, csvProperties.hasHeader()); mtd.put(DataExpression.DELIM_DELIMITER, csvProperties.getDelim()); } http://git-wip-us.apache.org/repos/asf/systemml/blob/11d11987/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java index 43f5229..703a850 100644 --- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java @@ -47,10 +47,10 @@ import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.io.FileFormatPropertiesCSV; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -741,7 +741,7 @@ public abstract class AutomatedTestBase //read frame data from hdfs String strFrameFileName = baseDirectory + EXPECTED_DIR + fileName; - CSVFileFormatProperties fprop = new CSVFileFormatProperties(); + FileFormatPropertiesCSV fprop = new FileFormatPropertiesCSV(); fprop.setHeader(true); FrameReader reader = FrameReaderFactory.createFrameReader(iinfo, fprop);
