[SYSTEMML-1800] Utils for reading matrix/frame blocks from streams In JMLC deployments, models and meta data are often read from resource streams of packaged artifacts. This patch adds some util functions for deserializing matrix and frame blocks directly from such input streams to avoid the expensive read of text formats.
Furthermore, this patch also cleans up various java doc issues that were recently introduced. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0fee3f66 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0fee3f66 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0fee3f66 Branch: refs/heads/master Commit: 0fee3f66615621b8bfeed5f1e453a875b6b5731b Parents: 4536782 Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Jul 21 22:12:00 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sat Jul 22 13:53:17 2017 -0700 ---------------------------------------------------------------------- .../sysml/runtime/instructions/Instruction.java | 10 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 19 +-- .../sysml/runtime/util/LocalFileUtils.java | 146 ++++++++++++++----- 3 files changed, 110 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/0fee3f66/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java b/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java index ad8cb92..6db8c7f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/Instruction.java @@ -71,15 +71,7 @@ public abstract class Instruction return type; } - /** - * Setter for instruction line/column number - * - * @param beginLine beginning line position - * @param endLine ending line position - * @param beginCol beginning column position - * @param endCol ending column position - */ - public void setLocation ( String filename, int beginLine, int endLine, int beginCol, int endCol) { + public void setLocation(String filename, int beginLine, int endLine, int beginCol, int endCol) { this.filename = filename; this.beginLine = beginLine; this.endLine = endLine; http://git-wip-us.apache.org/repos/asf/systemml/blob/0fee3f66/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index d5306cb..f3c9f7b 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -1170,6 +1170,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * Recomputes and materializes the number of non-zero values * of the entire matrix block. * + * @return number of non-zeros */ public long recomputeNonZeros() { if( sparse && sparseBlock!=null ) { //SPARSE (max long) @@ -3637,24 +3638,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return leftIndexingOperations(rhsMatrix, rl, ru, cl, cu, ret, update, null); } - /** - * Method to perform leftIndexing operation for a given lower and upper bounds in row and column dimensions. - * Updated matrix is returned as the output. - * - * Operations to be performed: - * 1) result=this; - * 2) result[rowLower:rowUpper, colLower:colUpper] = rhsMatrix; - * - * @param rhsMatrix matrix - * @param rl row lower - * @param ru row upper - * @param cl column lower - * @param cu column upper - * @param ret ? - * @param update ? - * @return matrix block - * @throws DMLRuntimeException if DMLRuntimeException occurs - */ public MatrixBlock leftIndexingOperations(MatrixBlock rhsMatrix, int rl, int ru, int cl, int cu, MatrixBlock ret, UpdateType update, String opcode) throws DMLRuntimeException http://git-wip-us.apache.org/repos/asf/systemml/blob/0fee3f66/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java index 0d74088..deda468 100644 --- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java +++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java @@ -72,52 +72,62 @@ public class LocalFileUtils /** Reads a matrix block from local file system. * - * @param filePathAndName file to read + * @param fname file name to read * @return matrix block * @throws IOException if IOException occurs */ - public static MatrixBlock readMatrixBlockFromLocal(String filePathAndName) throws IOException { - return (MatrixBlock) readWritableFromLocal(filePathAndName, new MatrixBlock()); + public static MatrixBlock readMatrixBlockFromLocal(String fname) throws IOException { + return (MatrixBlock) readWritableFromLocal(fname, new MatrixBlock()); } /** Reads a matrix block from local file system. * - * @param filePathAndName file to read + * @param fname file name to read * @param reuse matrix block to reuse * @return matrix block * @throws IOException if IOException occurs */ - public static MatrixBlock readMatrixBlockFromLocal(String filePathAndName, MatrixBlock reuse) throws IOException { - return (MatrixBlock) readWritableFromLocal(filePathAndName, reuse); + public static MatrixBlock readMatrixBlockFromLocal(String fname, MatrixBlock reuse) throws IOException { + return (MatrixBlock) readWritableFromLocal(fname, reuse); } + /** Reads a frame block from local file system. + * + * @param fname file name to read + * @return frame block + * @throws IOException if IOException occurs + */ + public static FrameBlock readFrameBlockFromLocal(String fname) throws IOException { + return (FrameBlock) readWritableFromLocal(fname, new FrameBlock()); + } + /** Reads a matrix/frame block from local file system. * - * @param filePathAndName file to read + * @param fname file name to read * @param matrix if true, read matrix. if false, read frame. * @return cache block (common interface to MatrixBlock and FrameBlock) * @throws IOException if IOException occurs */ - public static CacheBlock readCacheBlockFromLocal(String filePathAndName, boolean matrix) throws IOException { - return (CacheBlock) readWritableFromLocal(filePathAndName, matrix?new MatrixBlock():new FrameBlock()); + public static CacheBlock readCacheBlockFromLocal(String fname, boolean matrix) throws IOException { + return (CacheBlock) readWritableFromLocal(fname, matrix?new MatrixBlock():new FrameBlock()); } /** * Reads an arbitrary writable from local file system, using a fused buffered reader * with special support for matrix blocks. * - * @param filePathAndName file to read + * @param fname file name to read * @param ret hadoop writable * @return hadoop writable * @throws IOException if IOException occurs */ - public static Writable readWritableFromLocal(String filePathAndName, Writable ret) + public static Writable readWritableFromLocal(String fname, Writable ret) throws IOException { - FileInputStream fis = new FileInputStream( filePathAndName ); - DataInput in = !(ret instanceof MatrixBlock) ? - new DataInputStream(new BufferedInputStream(fis, BUFFER_SIZE)) : - new FastBufferedDataInputStream(fis, BUFFER_SIZE); + FileInputStream fis = new FileInputStream(fname); + DataInput in = !(ret instanceof MatrixBlock) ? + new DataInputStream(new BufferedInputStream(fis, BUFFER_SIZE)) : + new FastBufferedDataInputStream(fis, BUFFER_SIZE); try { ret.readFields(in); } @@ -129,38 +139,98 @@ public class LocalFileUtils return ret; } + /** + * Reads a matrix block from an input stream, using a fused buffered reader + * with special support for matrix blocks. + * + * @param is input stream to read + * @return matrix block + * @throws IOException if IOException occurs + */ + public static MatrixBlock readMatrixBlockFromStream(InputStream is) throws IOException { + return (MatrixBlock) readWritableFromStream(is, new MatrixBlock()); + } + + /** + * Reads a frame block from an input stream, using a fused buffered reader + * with special support for matrix blocks. + * + * @param is input stream to read + * @return frame block + * @throws IOException if IOException occurs + */ + public static FrameBlock readFrameBlockFromStream(InputStream is) throws IOException { + return (FrameBlock) readWritableFromStream(is, new FrameBlock()); + } + + /** + * Reads an arbitrary writable from an input stream, using a fused buffered reader + * with special support for matrix blocks. + * + * @param is input stream to read + * @param ret hadoop writable + * @return hadoop writable + * @throws IOException if IOException occurs + */ + public static Writable readWritableFromStream(InputStream is, Writable ret) + throws IOException + { + DataInput in = !(ret instanceof MatrixBlock) ? + new DataInputStream(new BufferedInputStream(is, BUFFER_SIZE)) : + new FastBufferedDataInputStream(is, BUFFER_SIZE); + try { + ret.readFields(in); + } + finally { + IOUtilFunctions.closeSilently((InputStream)in); + IOUtilFunctions.closeSilently(is); + } + + return ret; + } + /** Writes a matrix block to local file system. * - * @param filePathAndName file to write + * @param fname file name to write * @param mb matrix block * @throws IOException if IOException occurs */ - public static void writeMatrixBlockToLocal(String filePathAndName, MatrixBlock mb) throws IOException { - writeWritableToLocal(filePathAndName, mb); + public static void writeMatrixBlockToLocal(String fname, MatrixBlock mb) throws IOException { + writeWritableToLocal(fname, mb); + } + + /** Writes a frame block to local file system. + * + * @param fname file name to write + * @param fb fame block + * @throws IOException if IOException occurs + */ + public static void writeFrameBlockToLocal(String fname, FrameBlock fb) throws IOException { + writeWritableToLocal(fname, fb); } /** Writes a matrix/frame block to local file system. * - * @param filePathAndName file to write + * @param fname file name to write * @param cb cache block (common interface to matrix block and frame block) * @throws IOException if IOException occurs */ - public static void writeCacheBlockToLocal(String filePathAndName, CacheBlock cb) throws IOException { - writeWritableToLocal(filePathAndName, cb); + public static void writeCacheBlockToLocal(String fname, CacheBlock cb) throws IOException { + writeWritableToLocal(fname, cb); } /** * Writes an arbitrary writable to local file system, using a fused buffered writer * with special support for matrix blocks. * - * @param filePathAndName file to write + * @param fname file name to write * @param mb Hadoop writable * @throws IOException if IOException occurs */ - public static void writeWritableToLocal(String filePathAndName, Writable mb) + public static void writeWritableToLocal(String fname, Writable mb) throws IOException { - FileOutputStream fos = new FileOutputStream( filePathAndName ); + FileOutputStream fos = new FileOutputStream( fname ); FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE); try { @@ -172,13 +242,13 @@ public class LocalFileUtils } } - public static void writeByteArrayToLocal( String filePathAndName, byte[] data ) + public static void writeByteArrayToLocal( String fname, byte[] data ) throws IOException { //byte array write via java.nio file channel ~10-15% faster than java.io FileChannel channel = null; try { - Path path = Paths.get(filePathAndName); + Path path = Paths.get(fname); channel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE); channel.write(ByteBuffer.wrap(data)); @@ -188,15 +258,15 @@ public class LocalFileUtils } } - public static int readBlockSequenceFromLocal( String filePathAndName, Pair<MatrixIndexes,MatrixValue>[] outValues, HashMap<MatrixIndexes, Integer> outMap) + public static int readBlockSequenceFromLocal(String fname, + Pair<MatrixIndexes,MatrixValue>[] outValues, HashMap<MatrixIndexes, Integer> outMap) throws IOException { - FileInputStream fis = new FileInputStream( filePathAndName ); - FastBufferedDataInputStream in = new FastBufferedDataInputStream( fis, BUFFER_SIZE ); + FileInputStream fis = new FileInputStream(fname); + FastBufferedDataInputStream in = new FastBufferedDataInputStream(fis, BUFFER_SIZE); int bufferSize = 0; - try - { + try { int len = in.readInt(); for( int i=0; i<len; i++ ) { outValues[i].getKey().readFields(in); @@ -214,13 +284,13 @@ public class LocalFileUtils return bufferSize; } - public static void writeBlockSequenceToLocal( String filePathAndName, Pair<MatrixIndexes,MatrixValue>[] inValues, int len ) + public static void writeBlockSequenceToLocal(String fname, Pair<MatrixIndexes,MatrixValue>[] inValues, int len) throws IOException { if( len > inValues.length ) throw new IOException("Invalid length of block sequence: len="+len+" vs data="+inValues.length); - FileOutputStream fos = new FileOutputStream( filePathAndName ); + FileOutputStream fos = new FileOutputStream(fname); FastBufferedDataOutputStream out = new FastBufferedDataOutputStream(fos, BUFFER_SIZE); try @@ -281,7 +351,7 @@ public class LocalFileUtils return ret; } - public static void setLocalFilePermissions( File file, String permissions ) + public static void setLocalFilePermissions(File file, String permissions) { //note: user and group treated the same way char[] c = permissions.toCharArray(); @@ -310,7 +380,7 @@ public class LocalFileUtils return createWorkingDirectoryWithUUID( DMLScript.getUUID() ); } - public static String createWorkingDirectoryWithUUID( String uuid ) + public static String createWorkingDirectoryWithUUID(String uuid) throws DMLRuntimeException { //create local tmp dir if not existing @@ -407,7 +477,7 @@ public class LocalFileUtils return count; } - public static String getWorkingDir( String category ) + public static String getWorkingDir(String category) throws DMLRuntimeException { if( _workingDir == null ) @@ -422,7 +492,7 @@ public class LocalFileUtils return sb.toString(); } - public static String getUniqueWorkingDir( String category ) + public static String getUniqueWorkingDir(String category) throws DMLRuntimeException { if( _workingDir == null ) @@ -446,7 +516,7 @@ public class LocalFileUtils * @param text content of text file * @throws IOException */ - public static void writeTextFile( File file, String text ) + public static void writeTextFile(File file, String text) throws IOException { Writer writer = null;