Repository: systemml
Updated Branches:
  refs/heads/master bc781fcef -> 26fe72f22


[SYSTEMML-1984] Fix robustness JMLC config mgmt, read from input streams

This patch fixes special cases of JMLC configuration management,
specifically, if a connection is created in one thread, but it is used
in other threads to read data from input streams. Due to lost
thread-local configurations, this API call failed trying to read via a
multi-threaded reader. We now ensure that reads have the proper local
configurations and also avoid unnecessary exceptions by falling back to
sequential readers if necessary.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/26fe72f2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/26fe72f2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/26fe72f2

Branch: refs/heads/master
Commit: 26fe72f2283c2b3e6a7ed1f59d96c73f42e0fbef
Parents: bc781fc
Author: Matthias Boehm <[email protected]>
Authored: Tue Nov 7 18:16:53 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Tue Nov 7 18:17:03 2017 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/api/jmlc/Connection.java   | 31 ++++++++++++--------
 .../sysml/runtime/io/FrameReaderFactory.java    |  6 ++--
 .../sysml/runtime/io/ReaderTextCSVParallel.java |  4 ++-
 .../runtime/io/ReaderTextCellParallel.java      |  4 ++-
 4 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java 
b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index 2568977..705d784 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -131,9 +131,7 @@ public class Connection implements Closeable
                //create default configuration
                _dmlconf = new DMLConfig();
                
-               //set thread-local configurations for compilation and read
-               ConfigurationManager.setLocalConfig(_dmlconf);
-               ConfigurationManager.setLocalConfig(_cconf);
+               setLocalConfigs();
        }
        
        /**
@@ -149,10 +147,7 @@ public class Connection implements Closeable
                //set optional compiler configurations in current config
                for( ConfigType configType : configs )
                        _cconf.set(configType, true);
-               
-               //set thread-local configurations for compilation and read
-               ConfigurationManager.setLocalConfig(_dmlconf);
-               ConfigurationManager.setLocalConfig(_cconf);
+               setLocalConfigs();
        }
        
        /**
@@ -214,13 +209,11 @@ public class Connection implements Closeable
                if( invalidVars.length > 0 )
                        throw new LanguageException("Invalid variable names: 
"+Arrays.toString(invalidVars));
                
+               setLocalConfigs();
+               
                //simplified compilation chain
                Program rtprog = null;
                try {
-                       //set thread-local configurations for compilation
-                       ConfigurationManager.setLocalConfig(_dmlconf);
-                       ConfigurationManager.setLocalConfig(_cconf);
-                       
                        //parsing
                        ParserWrapper parser = 
ParserFactory.createParser(parsePyDML ? ScriptType.PYDML : ScriptType.DML);
                        DMLProgram prog = parser.parse(null, script, args);
@@ -368,6 +361,8 @@ public class Connection implements Closeable
        public double[][] readDoubleMatrix(String fname, InputInfo iinfo, long 
rows, long cols, int brlen, int bclen, long nnz) 
                throws IOException
        {
+               setLocalConfigs();
+               
                try {
                        MatrixReader reader = 
MatrixReaderFactory.createMatrixReader(iinfo);
                        MatrixBlock mb = reader.readMatrixFromHDFS(fname, rows, 
cols, brlen, bclen, nnz);
@@ -534,6 +529,8 @@ public class Connection implements Closeable
                        throw new IOException("Invalid input format (expected: 
csv, text or mm): "+format);
                }
                
+               setLocalConfigs();
+               
                try {
                        //read input matrix
                        InputInfo iinfo = 
DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
@@ -574,7 +571,7 @@ public class Connection implements Closeable
                        long rows = jmtd.getLong(DataExpression.READROWPARAM);
                        long cols = jmtd.getLong(DataExpression.READCOLPARAM);
                        String format = 
jmtd.getString(DataExpression.FORMAT_TYPE);
-                       InputInfo iinfo = 
InputInfo.stringExternalToInputInfo(format);                  
+                       InputInfo iinfo = 
InputInfo.stringExternalToInputInfo(format);
                
                        //read frame file
                        return readStringFrame(fname, iinfo, rows, cols);
@@ -598,6 +595,8 @@ public class Connection implements Closeable
        public String[][] readStringFrame(String fname, InputInfo iinfo, long 
rows, long cols) 
                throws IOException
        {
+               setLocalConfigs();
+               
                try {
                        FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
                        FrameBlock mb = reader.readFrameFromHDFS(fname, rows, 
cols);
@@ -764,6 +763,8 @@ public class Connection implements Closeable
                        throw new IOException("Invalid input format (expected: 
csv, text or mm): "+format);
                }
                
+               setLocalConfigs();
+               
                try {
                        //read input frame
                        InputInfo iinfo = 
DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
@@ -863,4 +864,10 @@ public class Connection implements Closeable
        public FrameBlock readTransformMetaDataFromPath(String spec, String 
metapath, String colDelim) throws IOException {
                return TfMetaUtils.readTransformMetaDataFromPath(spec, 
metapath, colDelim);
        }
+       
+       private void setLocalConfigs() {
+               //set thread-local configurations for compilation and read
+               ConfigurationManager.setLocalConfig(_dmlconf);
+               ConfigurationManager.setLocalConfig(_cconf);
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
index 6300b32..efb4014 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
@@ -44,7 +44,7 @@ public class FrameReaderFactory
                FrameReader reader = null;
 
                if( iinfo == InputInfo.TextCellInputInfo ) {
-                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS)
 )
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS)
 )
                                reader = new FrameReaderTextCellParallel();
                        else    
                                reader = new FrameReaderTextCell();
@@ -52,7 +52,7 @@ public class FrameReaderFactory
                else if( iinfo == InputInfo.CSVInputInfo ) {
                        if( props!=null && !(props instanceof 
CSVFileFormatProperties) )
                                throw new DMLRuntimeException("Wrong type of 
file format properties for CSV writer.");
-                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS)
 )
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_TEXTFORMATS)
 )
                                reader = new FrameReaderTextCSVParallel( 
(CSVFileFormatProperties)props );
                        else
                                reader = new FrameReaderTextCSV( 
(CSVFileFormatProperties)props );
@@ -65,7 +65,7 @@ public class FrameReaderFactory
                }
                else {
                        throw new DMLRuntimeException("Failed to create frame 
reader for unknown input info: "
-                                                  + 
InputInfo.inputInfoToString(iinfo));
+                               + InputInfo.inputInfoToString(iinfo));
                }
                
                return reader;

http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index eaf5d7f..f145d0a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -116,7 +116,9 @@ public class ReaderTextCSVParallel extends MatrixReader
        public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
                throws IOException, DMLRuntimeException 
        {
-               throw new DMLRuntimeException("Not implemented yet.");
+               //not implemented yet, fallback to sequential reader
+               return new ReaderTextCSV(_props)
+                       .readMatrixFromInputStream(is, rlen, clen, brlen, 
bclen, estnnz);
        }
        
        private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, 
JobConf job, 

http://git-wip-us.apache.org/repos/asf/systemml/blob/26fe72f2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index b2a9608..2afbd7e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -107,7 +107,9 @@ public class ReaderTextCellParallel extends MatrixReader
        public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
                throws IOException, DMLRuntimeException 
        {
-               throw new DMLRuntimeException("Not implemented yet.");
+               //not implemented yet, fallback to sequential reader
+               return new ReaderTextCell(_isMMFile ? 
InputInfo.MatrixMarketInputInfo : InputInfo.TextCellInputInfo)
+                       .readMatrixFromInputStream(is, rlen, clen, brlen, 
bclen, estnnz);
        }
        
        private void readTextCellMatrixFromHDFS( Path path, JobConf job, 
MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean 
matrixMarket )

Reply via email to