Repository: systemml Updated Branches: refs/heads/master bc781fcef -> 26fe72f22
[SYSTEMML-1984] Fix robustness JMLC config mgmt, read from input streams This patch fixes special cases of JMLC configuration management, specifically, if a connection is created in one thread, but it is used in other threads to read data from input streams. Due to lost thread-local configurations, this API call failed trying to read via a multi-threaded reader. We now ensure that reads have the proper local configurations and also avoid unnecessary exceptions by falling back to sequential readers if necessary. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/26fe72f2 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/26fe72f2 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/26fe72f2 Branch: refs/heads/master Commit: 26fe72f2283c2b3e6a7ed1f59d96c73f42e0fbef Parents: bc781fc Author: Matthias Boehm <[email protected]> Authored: Tue Nov 7 18:16:53 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Tue Nov 7 18:17:03 2017 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/api/jmlc/Connection.java | 31 ++++++++++++-------- .../sysml/runtime/io/FrameReaderFactory.java | 6 ++-- .../sysml/runtime/io/ReaderTextCSVParallel.java | 4 ++- .../runtime/io/ReaderTextCellParallel.java | 4 ++- 4 files changed, 28 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/api/jmlc/Connection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java index 2568977..705d784 100644 --- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java +++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java @@ -131,9 +131,7 @@ public class Connection implements Closeable //create default configuration _dmlconf = new DMLConfig(); - //set thread-local configurations for compilation and read - ConfigurationManager.setLocalConfig(_dmlconf); - ConfigurationManager.setLocalConfig(_cconf); + setLocalConfigs(); } /** @@ -149,10 +147,7 @@ public class Connection implements Closeable //set optional compiler configurations in current config for( ConfigType configType : configs ) _cconf.set(configType, true); - - //set thread-local configurations for compilation and read - ConfigurationManager.setLocalConfig(_dmlconf); - ConfigurationManager.setLocalConfig(_cconf); + setLocalConfigs(); } /** @@ -214,13 +209,11 @@ public class Connection implements Closeable if( invalidVars.length > 0 ) throw new LanguageException("Invalid variable names: "+Arrays.toString(invalidVars)); + setLocalConfigs(); + //simplified compilation chain Program rtprog = null; try { - //set thread-local configurations for compilation - ConfigurationManager.setLocalConfig(_dmlconf); - ConfigurationManager.setLocalConfig(_cconf); - //parsing ParserWrapper parser = ParserFactory.createParser(parsePyDML ? ScriptType.PYDML : ScriptType.DML); DMLProgram prog = parser.parse(null, script, args); @@ -368,6 +361,8 @@ public class Connection implements Closeable public double[][] readDoubleMatrix(String fname, InputInfo iinfo, long rows, long cols, int brlen, int bclen, long nnz) throws IOException { + setLocalConfigs(); + try { MatrixReader reader = MatrixReaderFactory.createMatrixReader(iinfo); MatrixBlock mb = reader.readMatrixFromHDFS(fname, rows, cols, brlen, bclen, nnz); @@ -534,6 +529,8 @@ public class Connection implements Closeable throw new IOException("Invalid input format (expected: csv, text or mm): "+format); } + setLocalConfigs(); + try { //read input matrix InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? @@ -574,7 +571,7 @@ public class Connection implements Closeable long rows = jmtd.getLong(DataExpression.READROWPARAM); long cols = jmtd.getLong(DataExpression.READCOLPARAM); String format = jmtd.getString(DataExpression.FORMAT_TYPE); - InputInfo iinfo = InputInfo.stringExternalToInputInfo(format); + InputInfo iinfo = InputInfo.stringExternalToInputInfo(format); //read frame file return readStringFrame(fname, iinfo, rows, cols); @@ -598,6 +595,8 @@ public class Connection implements Closeable public String[][] readStringFrame(String fname, InputInfo iinfo, long rows, long cols) throws IOException { + setLocalConfigs(); + try { FrameReader reader = FrameReaderFactory.createFrameReader(iinfo); FrameBlock mb = reader.readFrameFromHDFS(fname, rows, cols); @@ -764,6 +763,8 @@ public class Connection implements Closeable throw new IOException("Invalid input format (expected: csv, text or mm): "+format); } + setLocalConfigs(); + try { //read input frame InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? @@ -863,4 +864,10 @@ public class Connection implements Closeable public FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) throws IOException { return TfMetaUtils.readTransformMetaDataFromPath(spec, metapath, colDelim); } + + private void setLocalConfigs() { + //set thread-local configurations for compilation and read + ConfigurationManager.setLocalConfig(_dmlconf); + ConfigurationManager.setLocalConfig(_cconf); + } } http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java index 6300b32..efb4014 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java @@ -44,7 +44,7 @@ public class FrameReaderFactory FrameReader reader = null; if( iinfo == InputInfo.TextCellInputInfo ) { - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) ) reader = new FrameReaderTextCellParallel(); else reader = new FrameReaderTextCell(); @@ -52,7 +52,7 @@ public class FrameReaderFactory else if( iinfo == InputInfo.CSVInputInfo ) { if( props!=null && !(props instanceof CSVFileFormatProperties) ) throw new DMLRuntimeException("Wrong type of file format properties for CSV writer."); - if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS) ) reader = new FrameReaderTextCSVParallel( (CSVFileFormatProperties)props ); else reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props ); @@ -65,7 +65,7 @@ public class FrameReaderFactory } else { throw new DMLRuntimeException("Failed to create frame reader for unknown input info: " - + InputInfo.inputInfoToString(iinfo)); + + InputInfo.inputInfoToString(iinfo)); } return reader; http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index eaf5d7f..f145d0a 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -116,7 +116,9 @@ public class ReaderTextCSVParallel extends MatrixReader public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) throws IOException, DMLRuntimeException { - throw new DMLRuntimeException("Not implemented yet."); + //not implemented yet, fallback to sequential reader + return new ReaderTextCSV(_props) + .readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz); } private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index b2a9608..2afbd7e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -107,7 +107,9 @@ public class ReaderTextCellParallel extends MatrixReader public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) throws IOException, DMLRuntimeException { - throw new DMLRuntimeException("Not implemented yet."); + //not implemented yet, fallback to sequential reader + return new ReaderTextCell(_isMMFile ? InputInfo.MatrixMarketInputInfo : InputInfo.TextCellInputInfo) + .readMatrixFromInputStream(is, rlen, clen, brlen, bclen, estnnz); } private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket )
