http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java index 9e30f5c..7743b61 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java @@ -19,40 +19,13 @@ package org.apache.sysml.runtime.transform; -import java.io.EOFException; -import java.io.IOException; import java.io.Serializable; -import java.util.Arrays; import java.util.regex.Pattern; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.lops.Lop; -import org.apache.sysml.parser.DataExpression; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.io.MatrixReader; -import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; -import org.apache.sysml.runtime.util.MapReduceTool; -import org.apache.sysml.runtime.util.UtilFunctions; -@SuppressWarnings("deprecation") -public class TfUtils implements Serializable{ - +public class TfUtils implements Serializable +{ private static final long serialVersionUID = 526252850872633125L; protected enum ColumnTypes { @@ -89,9 +62,7 @@ public class TfUtils implements Serializable{ //transform meta data constants (old file-based transform) public static final String TXMTD_SEP = ","; - public static final String TXMTD_COLTYPES = "coltypes.csv"; public static final String TXMTD_COLNAMES = "column.names"; - public static final String TXMTD_DC_COLNAMES = "dummycoded.column.names"; public static final String TXMTD_RCD_MAP_SUFFIX = ".map"; public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct"; public static final String TXMTD_BIN_FILE_SUFFIX = ".bin"; @@ -101,184 +72,14 @@ public class TfUtils implements Serializable{ public static final String JSON_MTHD = "methods"; public static final String JSON_CONSTS = "constants"; public static final String JSON_NBINS = "numbins"; - protected static final String MODE_FILE_SUFFIX = ".mode"; - protected static final String SCALE_FILE_SUFFIX = ".scale"; - protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv"; - protected static final String DCD_NAME_SEP = "_"; - - - private OmitAgent _oa = null; - private MVImputeAgent _mia = null; - private RecodeAgent _ra = null; - private BinAgent _ba = null; - private DummycodeAgent _da = null; - - private long _numRecordsInPartFile; // Total number of records in the data file - private long _numValidRecords; // (_numRecordsInPartFile - #of omitted records) - private long _numTransformedRows; // Number of rows after applying transformations - private long _numTransformedColumns; // Number of columns after applying transformations private String _headerLine = null; private boolean _hasHeader; private Pattern _delim = null; private String _delimString = null; private String[] _NAstrings = null; - private String[] _outputColumnNames = null; private int _numInputCols = -1; - private String _tfMtdDir = null; - private String _spec = null; - private String _offsetFile = null; - private String _tmpDir = null; - private String _outputPath = null; - - public TfUtils(JobConf job, boolean minimal) - throws IOException, JSONException - { - if( !InfrastructureAnalyzer.isLocalMode(job) ) { - ConfigurationManager.setCachedJobConf(job); - } - _NAstrings = TfUtils.parseNAStrings(job); - _spec = job.get(MRJobConfiguration.TF_SPEC); - _oa = new OmitAgent(new JSONObject(_spec), null, -1); - } - - // called from GenTFMtdMapper, ApplyTf (Hadoop) - public TfUtils(JobConf job) - throws IOException, JSONException - { - if( !InfrastructureAnalyzer.isLocalMode(job) ) { - ConfigurationManager.setCachedJobConf(job); - } - - boolean hasHeader = Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER)); - String[] naStrings = TfUtils.parseNAStrings(job); - long numCols = UtilFunctions.parseToLong( job.get(MRJobConfiguration.TF_NUM_COLS) ); // #cols input data - String spec = job.get(MRJobConfiguration.TF_SPEC); - String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE); - String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC); - String outputPath = FileOutputFormat.getOutputPath(job).toString(); - JSONObject jspec = new JSONObject(spec); - - init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, job.get(MRJobConfiguration.TF_DELIM), naStrings, jspec, numCols, offsetFile, tmpPath, outputPath); - } - - // called from GenTfMtdReducer - public TfUtils(JobConf job, String tfMtdDir) throws IOException, JSONException - { - this(job); - _tfMtdDir = tfMtdDir; - } - - // called from GenTFMtdReducer and ApplyTf (Spark) - public TfUtils(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String offsetFile, String tmpPath) throws IOException, JSONException { - init (headerLine, hasHeader, delim, naStrings, spec, ncol, offsetFile, tmpPath, null); - _tfMtdDir = tfMtdDir; - } - - protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err) - throws IOException { - // check non-existing file - if (!fs.exists(path)) - if ( err ) - throw new IOException("File " + path.toString() + " does not exist on HDFS/LFS."); - else - return false; - - // check for empty file - if( MapReduceTool.isFileEmpty(fs, path) ) - if ( err ) - throw new EOFException("Empty input file " + path.toString() + "."); - else - return false; - - return true; - } - - public static String getPartFileName(JobConf job) throws IOException { - Path path = new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - path = path.makeQualified(fs); - return path.toString(); - } - - public static boolean isPartFileWithHeader(JobConf job) throws IOException { - String thisfile=getPartFileName(job); - Path path = new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - path = path.makeQualified(fs); - return thisfile.toString().equals(path.toString()); - } - - /** - * Prepare NA strings so that they can be sent to workers via JobConf. - * A "dummy" string is added at the end to handle the case of empty strings. - * @param na NA string - * @return NA string concatenated with NA string separator concatenated with "dummy" - */ - public static String prepNAStrings(String na) { - return na + DataExpression.DELIM_NA_STRING_SEP + "dummy"; - } - - public static String[] parseNAStrings(String na) - { - if ( na == null ) - return null; - - String[] tmp = Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, -1); - return tmp; //Arrays.copyOf(tmp, tmp.length-1); - } - - public static String[] parseNAStrings(JobConf job) - { - return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS)); - } - - private void createAgents(JSONObject spec, String[] naStrings) - throws IOException, JSONException - { - _oa = new OmitAgent(spec, _outputColumnNames, _numInputCols); - _mia = new MVImputeAgent(spec, null, naStrings, _numInputCols); - _ra = new RecodeAgent(spec, _outputColumnNames, _numInputCols); - _ba = new BinAgent(spec, _outputColumnNames, _numInputCols); - _da = new DummycodeAgent(spec, _outputColumnNames, _numInputCols); - } - - private void parseColumnNames() { - _outputColumnNames = _delim.split(_headerLine, -1); - for(int i=0; i < _outputColumnNames.length; i++) - _outputColumnNames[i] = UtilFunctions.unquote(_outputColumnNames[i]); - } - - private void init(String headerLine, boolean hasHeader, String delim, String[] naStrings, JSONObject spec, long numCols, String offsetFile, String tmpPath, String outputPath) throws IOException, JSONException - { - _numRecordsInPartFile = 0; - _numValidRecords = 0; - _numTransformedRows = 0; - _numTransformedColumns = 0; - - //TODO: fix hard-wired header propagation to meta data column names - - _headerLine = headerLine; - _hasHeader = hasHeader; - _delimString = delim; - _delim = Pattern.compile(Pattern.quote(delim)); - _NAstrings = naStrings; - _numInputCols = (int)numCols; - _offsetFile = offsetFile; - _tmpDir = tmpPath; - _outputPath = outputPath; - - parseColumnNames(); - createAgents(spec, naStrings); - } - - public void incrValid() { _numValidRecords++; } - public long getValid() { return _numValidRecords; } - public long getTotal() { return _numRecordsInPartFile; } - public long getNumTransformedRows() { return _numTransformedRows; } - public long getNumTransformedColumns() { return _numTransformedColumns; } - public String getHeader() { return _headerLine; } public boolean hasHeader() { return _hasHeader; } public String getDelimString() { return _delimString; } @@ -286,24 +87,6 @@ public class TfUtils implements Serializable{ public String[] getNAStrings() { return _NAstrings; } public long getNumCols() { return _numInputCols; } - public String getSpec() { return _spec; } - public String getTfMtdDir() { return _tfMtdDir; } - public String getOffsetFile() { return _offsetFile; } - public String getTmpDir() { return _tmpDir; } - public String getOutputPath() { return _outputPath; } - - public String getName(int colID) { return _outputColumnNames[colID-1]; } - - public void setValid(long n) { _numValidRecords = n;} - public void incrTotal() { _numRecordsInPartFile++; } - public void setTotal(long n) { _numRecordsInPartFile = n;} - - public OmitAgent getOmitAgent() { return _oa; } - public MVImputeAgent getMVImputeAgent(){ return _mia;} - public RecodeAgent getRecodeAgent() { return _ra; } - public BinAgent getBinAgent() { return _ba; } - public DummycodeAgent getDummycodeAgent() { return _da; } - /** * Function that checks if the given string is one of NA strings. * @@ -321,229 +104,4 @@ public class TfUtils implements Serializable{ } return false; } - - public String[] getWords(Text line) { - return getWords(line.toString()); - } - - - public String[] getWords(String line) { - return getDelim().split(line.trim(), -1); - } - - /** - * Process a given row to construct transformation metadata. - * - * @param line string to break into words - * @return string array of words from the line - * @throws IOException if IOException occurs - */ - public String[] prepareTfMtd(String line) throws IOException { - String[] words = getWords(line); - if(!getOmitAgent().omit(words, this)) - { - getMVImputeAgent().prepare(words); - getRecodeAgent().prepare(words, this); - getBinAgent().prepare(words, this); - incrValid(); - } - incrTotal(); - - return words; - } - - public void loadTfMetadata() throws IOException - { - JobConf job = ConfigurationManager.getCachedJobConf(); - loadTfMetadata(job, false); - } - - public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws IOException - { - Path tfMtdDir = null; - FileSystem fs = null; - - if(fromLocalFS) { - // metadata must be read from local file system (e.g., distributed cache in the case of Hadoop) - tfMtdDir = (DistributedCache.getLocalCacheFiles(job))[0]; - fs = FileSystem.getLocal(job); - } - else { - tfMtdDir = new Path(getTfMtdDir()); - fs = IOUtilFunctions.getFileSystem(tfMtdDir, job); - } - - // load transformation metadata - getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this); - getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this); - getBinAgent().loadTxMtd(job, fs, tfMtdDir, this); - - // associate recode maps and bin definitions with dummycoding agent, - // as recoded and binned columns are typically dummycoded - getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() ); - getDummycodeAgent().setNumBins(getBinAgent().getColList(), getBinAgent().getNumBins()); - getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this); - - } - - public String processHeaderLine() throws IOException - { - //TODO: fix hard-wired header propagation to meta data column names - - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - String dcdHeader = getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim()); - getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), (int) getNumCols(), this); - - // write header information (before and after transformation) to temporary path - // these files are copied into txMtdPath, once the ApplyTf job is complete. - DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), dcdHeader); - - return dcdHeader; - //_numTransformedColumns = getDelim().split(dcdHeader, -1).length; - //return _numTransformedColumns; - } - - public boolean omit(String[] words) { - if(getOmitAgent() == null) - return false; - return getOmitAgent().omit(words, this); - } - - /** - * Function to apply transformation metadata on a given row. - * - * @param words string array of words - * @return string array of transformed words - */ - public String[] apply( String[] words ) { - words = getMVImputeAgent().apply(words); - words = getRecodeAgent().apply(words); - words = getBinAgent().apply(words); - words = getDummycodeAgent().apply(words); - _numTransformedRows++; - - return words; - } - - public void check(String []words) throws DMLRuntimeException - { - boolean checkEmptyString = ( getNAStrings() != null ); - if ( checkEmptyString ) - { - final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: "; - for(int i=0; i<words.length; i++) - if ( words[i] != null && words[i].equals("")) - throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1)); - } - } - - public String checkAndPrepOutputString(String []words) throws DMLRuntimeException { - return checkAndPrepOutputString(words, new StringBuilder()); - } - - public String checkAndPrepOutputString(String []words, StringBuilder sb) throws DMLRuntimeException - { - /* - * Check if empty strings ("") have to be handled. - * - * Unless na.strings are provided, empty strings are (implicitly) considered as value zero. - * When na.strings are provided, then "" is considered a missing value indicator, and the - * user is expected to provide an appropriate imputation method. Therefore, when na.strings - * are provided, "" encountered in any column (after all transformations are applied) - * denotes an erroneous condition. - */ - boolean checkEmptyString = ( getNAStrings() != null ); //&& !MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) { - - //StringBuilder sb = new StringBuilder(); - sb.setLength(0); - int i =0; - - if ( checkEmptyString ) - { - final String msg = "When na.strings are provided, empty string \"\" is considered as a missing value, and it must be imputed appropriately. Encountered an unhandled empty string in column ID: "; - if ( words[0] != null ) - if ( words[0].equals("") ) - throw new DMLRuntimeException( msg + getDummycodeAgent().mapDcdColumnID(1)); - else - sb.append(words[0]); - else - sb.append("0"); - - for(i=1; i<words.length; i++) - { - sb.append(_delimString); - - if ( words[i] != null ) - if ( words[i].equals("") ) - throw new DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1)); - else - sb.append(words[i]); - else - sb.append("0"); - } - } - else - { - sb.append(words[0] != null ? words[0] : "0"); - for(i=1; i<words.length; i++) - { - sb.append(_delimString); - sb.append(words[i] != null ? words[i] : "0"); - } - } - - return sb.toString(); - } - - private Reader initOffsetsReader(JobConf job) throws IOException - { - Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME)); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - Path[] files = MatrixReader.getSequenceFilePaths(fs, path); - if ( files.length != 1 ) - throw new IOException("Expecting a single file under counters file: " + path.toString()); - - Reader reader = new SequenceFile.Reader(fs, files[0], job); - - return reader; - } - - /** - * Function to generate custom file names (transform-part-.....) for - * mappers' output for ApplyTfCSV job. The idea is to find the index - * of (thisfile, fileoffset) in the list of all offsets from the - * counters/offsets file, which was generated from either GenTfMtdMR - * or AssignRowIDMR job. - * - * @param job job configuration - * @param offset file offset - * @return part file id (ie, 00001, 00002, etc) - * @throws IOException if IOException occurs - */ - public String getPartFileID(JobConf job, long offset) throws IOException - { - Reader reader = null; - int id = 0; - try { - reader = initOffsetsReader(job); - ByteWritable key=new ByteWritable(); - OffsetCount value=new OffsetCount(); - String thisFile = TfUtils.getPartFileName(job); - while (reader.next(key, value)) { - if ( thisFile.equals(value.filename) && value.fileOffset == offset ) - break; - id++; - } - } - finally { - IOUtilFunctions.closeSilently(reader); - } - - String sid = Integer.toString(id); - char[] carr = new char[5-sid.length()]; - Arrays.fill(carr, '0'); - String ret = (new String(carr)).concat(sid); - - return ret; - } }
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java index a3f01a1..304dcdb 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java @@ -19,20 +19,10 @@ package org.apache.sysml.runtime.transform.encode; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.transform.DistinctValue; -import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.wink.json4j.JSONArray; @@ -152,11 +142,4 @@ public abstract class Encoder implements Serializable * @param meta frame block */ public abstract void initMetaData(FrameBlock meta); - - - //OLD API: kept for a transition phase only - //TODO stage 2: refactor data and meta data IO into minimal set of ultility functions - abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException; - abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException; - abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException; } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java new file mode 100644 index 0000000..fbe6994 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.wink.json4j.JSONArray; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; +import org.apache.sysml.lops.Lop; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class EncoderBin extends Encoder +{ + private static final long serialVersionUID = 1917445005206076078L; + + public static final String MIN_PREFIX = "min"; + public static final String MAX_PREFIX = "max"; + public static final String NBINS_PREFIX = "nbins"; + + private int[] _numBins = null; + private double[] _min=null, _max=null; // min and max among non-missing values + private double[] _binWidths = null; // width of a bin for each attribute + + //frame transform-apply attributes + private double[][] _binMins = null; + private double[][] _binMaxs = null; + + public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen) + throws JSONException, IOException + { + this(parsedSpec, colnames, clen, false); + } + + public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen, boolean colsOnly) + throws JSONException, IOException + { + super( null, clen ); + if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) ) + return; + + if( colsOnly ) { + List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec, colnames); + initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0]))); + } + else + { + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN); + JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); + JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS); + initColList(attrs); + + _numBins = new int[attrs.size()]; + for(int i=0; i < _numBins.length; i++) + _numBins[i] = UtilFunctions.toInt(nbins.get(i)); + + // initialize internal transformation metadata + _min = new double[_colList.length]; + Arrays.fill(_min, Double.MAX_VALUE); + _max = new double[_colList.length]; + Arrays.fill(_max, -Double.MAX_VALUE); + + _binWidths = new double[_colList.length]; + } + } + + public void prepare(String[] words, TfUtils agents) { + if ( !isApplicable() ) + return; + + for(int i=0; i <_colList.length; i++) { + int colID = _colList[i]; + + String w = null; + double d = 0; + + // equi-width + w = UtilFunctions.unquote(words[colID-1].trim()); + if(!TfUtils.isNA(agents.getNAStrings(),w)) { + d = UtilFunctions.parseToDouble(w); + if(d < _min[i]) + _min[i] = d; + if(d > _max[i]) + _max[i] = d; + } + } + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + build(in); + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + // nothing to do + } + + /** + * Method to apply transformations. + */ + @Override + public String[] apply(String[] words) { + if( !isApplicable() ) + return words; + + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; + try { + double val = UtilFunctions.parseToDouble(words[colID-1]); + int binid = 1; + double tmp = _min[i] + _binWidths[i]; + while(val > tmp && binid < _numBins[i]) { + tmp += _binWidths[i]; + binid++; + } + words[colID-1] = Integer.toString(binid); + } + catch(NumberFormatException e) { + throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method."); + } + } + + return words; + } + + @Override + public MatrixBlock apply(FrameBlock in, MatrixBlock out) { + for(int j=0; j<_colList.length; j++) { + int colID = _colList[j]; + for( int i=0; i<in.getNumRows(); i++ ) { + double inVal = UtilFunctions.objectToDouble( + in.getSchema()[colID-1], in.get(i, colID-1)); + int ix = Arrays.binarySearch(_binMaxs[j], inVal); + int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1; + out.quickSetValue(i, colID-1, binID); + } + } + return out; + } + + @Override + public FrameBlock getMetaData(FrameBlock meta) { + return meta; + } + + @Override + public void initMetaData(FrameBlock meta) { + _binMins = new double[_colList.length][]; + _binMaxs = new double[_colList.length][]; + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + int nbins = (int)meta.getColumnMetadata()[colID-1].getNumDistinct(); + _binMins[j] = new double[nbins]; + _binMaxs[j] = new double[nbins]; + for( int i=0; i<nbins; i++ ) { + String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX); + _binMins[j][i] = Double.parseDouble(tmp[0]); + _binMaxs[j][i] = Double.parseDouble(tmp[1]); + } + } + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java index 9efbc19..deff887 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java @@ -19,20 +19,11 @@ package org.apache.sysml.runtime.transform.encode; -import java.io.IOException; -import java.util.Iterator; import java.util.List; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.transform.DistinctValue; -import org.apache.sysml.runtime.transform.TfUtils; /** * Simple composite encoder that applies a list of encoders @@ -90,7 +81,6 @@ public class EncoderComposite extends Encoder encoder.build(in); } - @Override public String[] apply(String[] in) { for( Encoder encoder : _encoders ) @@ -119,19 +109,4 @@ public class EncoderComposite extends Encoder for( Encoder encoder : _encoders ) encoder.initMetaData(out); } - - @Override - public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - throw new RuntimeException("File-based api not supported."); - } - - @Override - public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { - throw new RuntimeException("File-based api not supported."); - } - - @Override - public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - throw new RuntimeException("File-based api not supported."); - } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java new file mode 100644 index 0000000..743381a --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +public class EncoderDummycode extends Encoder +{ + private static final long serialVersionUID = 5832130477659116489L; + + private int[] _domainSizes = null; // length = #of dummycoded columns + private long _dummycodedLength = 0; // #of columns after dummycoded + + public EncoderDummycode(JSONObject parsedSpec, String[] colnames, int clen) throws JSONException { + super(null, clen); + + if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) { + int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_DUMMYCODE); + initColList(collist); + } + } + + @Override + public int getNumCols() { + return (int)_dummycodedLength; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + //do nothing + } + + /** + * Method to apply transformations. + * + * @param words array of strings + * @return array of transformed strings + */ + @Override + public String[] apply(String[] words) + { + if( !isApplicable() ) + return words; + + String[] nwords = new String[(int)_dummycodedLength]; + int rcdVal = 0; + + for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) { + if(idx < _colList.length && colID==_colList[idx]) { + // dummycoded columns + try { + rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1])); + nwords[ ncolID-1+rcdVal-1 ] = "1"; + ncolID += _domainSizes[idx]; + idx++; + } + catch (Exception e) { + throw new RuntimeException("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength); + } + } + else { + nwords[ncolID-1] = words[colID-1]; + ncolID++; + } + } + + return nwords; + } + + @Override + public MatrixBlock apply(FrameBlock in, MatrixBlock out) + { + MatrixBlock ret = new MatrixBlock(out.getNumRows(), (int)_dummycodedLength, false); + + for( int i=0; i<out.getNumRows(); i++ ) { + for(int colID=1, idx=0, ncolID=1; colID <= out.getNumColumns(); colID++) { + double val = out.quickGetValue(i, colID-1); + if(idx < _colList.length && colID==_colList[idx]) { + ret.quickSetValue(i, ncolID-1+(int)val-1, 1); + ncolID += _domainSizes[idx]; + idx++; + } + else { + double ptval = UtilFunctions.objectToDouble(in.getSchema()[colID-1], in.get(i, colID-1)); + ret.quickSetValue(i, ncolID-1, ptval); + ncolID++; + } + } + } + + return ret; + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + return out; + } + + @Override + public void initMetaData(FrameBlock meta) { + //initialize domain sizes and output num columns + _domainSizes = new int[_colList.length]; + _dummycodedLength = _clen; + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + _domainSizes[j] = (int)meta.getColumnMetadata()[colID-1].getNumDistinct(); + _dummycodedLength += _domainSizes[j]-1; + } + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java index f7ceefd..13b2810 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java @@ -28,11 +28,6 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; -import org.apache.sysml.runtime.transform.BinAgent; -import org.apache.sysml.runtime.transform.DummycodeAgent; -import org.apache.sysml.runtime.transform.MVImputeAgent; -import org.apache.sysml.runtime.transform.OmitAgent; -import org.apache.sysml.runtime.transform.RecodeAgent; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -40,7 +35,6 @@ import org.apache.wink.json4j.JSONObject; public class EncoderFactory { - public static Encoder createEncoder(String spec, String[] colnames, int clen, FrameBlock meta) throws DMLRuntimeException { return createEncoder(spec, colnames, UtilFunctions.nCopies(clen, ValueType.STRING), meta); } @@ -79,7 +73,7 @@ public class EncoderFactory //create individual encoders if( !rcIDs.isEmpty() ) { - RecodeAgent ra = new RecodeAgent(jSpec, colnames, clen); + EncoderRecode ra = new EncoderRecode(jSpec, colnames, clen); ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0]))); lencoders.add(ra); } @@ -87,13 +81,13 @@ public class EncoderFactory lencoders.add(new EncoderPassThrough( ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen)); if( !dcIDs.isEmpty() ) - lencoders.add(new DummycodeAgent(jSpec, colnames, schema.length)); + lencoders.add(new EncoderDummycode(jSpec, colnames, schema.length)); if( !binIDs.isEmpty() ) - lencoders.add(new BinAgent(jSpec, colnames, schema.length, true)); + lencoders.add(new EncoderBin(jSpec, colnames, schema.length, true)); if( !oIDs.isEmpty() ) - lencoders.add(new OmitAgent(jSpec, colnames, schema.length)); + lencoders.add(new EncoderOmit(jSpec, colnames, schema.length)); if( !mvIDs.isEmpty() ) { - MVImputeAgent ma = new MVImputeAgent(jSpec, colnames, schema.length); + EncoderMVImpute ma = new EncoderMVImpute(jSpec, colnames, schema.length); ma.initRecodeIDList(rcIDs); lencoders.add(ma); } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java new file mode 100644 index 0000000..55a0bde --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import java.io.IOException; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.wink.json4j.JSONArray; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; +import org.apache.sysml.runtime.functionobjects.CM; +import org.apache.sysml.runtime.functionobjects.Mean; +import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; +import org.apache.sysml.runtime.instructions.cp.KahanObject; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class EncoderMVImpute extends Encoder +{ + private static final long serialVersionUID = 9057868620144662194L; + + public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT }; + + private MVMethod[] _mvMethodList = null; + private MVMethod[] _mvscMethodList = null; // scaling methods for attributes that are imputed and also scaled + + private BitSet _isMVScaled = null; + private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE); // function object that understands variance computation + + // objects required to compute mean and variance of all non-missing entries + private Mean _meanFn = Mean.getMeanFnObject(); // function object that understands mean computation + private KahanObject[] _meanList = null; // column-level means, computed so far + private long[] _countList = null; // #of non-missing values + + private CM_COV_Object[] _varList = null; // column-level variances, computed so far (for scaling) + + private int[] _scnomvList = null; // List of attributes that are scaled but not imputed + private MVMethod[] _scnomvMethodList = null; // scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring + private KahanObject[] _scnomvMeanList = null; // column-level means, for attributes scaled but not imputed + private long[] _scnomvCountList = null; // #of non-missing values, for attributes scaled but not imputed + private CM_COV_Object[] _scnomvVarList = null; // column-level variances, computed so far + + private String[] _replacementList = null; // replacements: for global_mean, mean; and for global_mode, recode id of mode category + private String[] _NAstrings = null; + private List<Integer> _rcList = null; + private HashMap<Integer,HashMap<String,Long>> _hist = null; + + public String[] getReplacements() { return _replacementList; } + public KahanObject[] getMeans() { return _meanList; } + public CM_COV_Object[] getVars() { return _varList; } + public KahanObject[] getMeans_scnomv() { return _scnomvMeanList; } + public CM_COV_Object[] getVars_scnomv() { return _scnomvVarList; } + + public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, int clen) + throws JSONException + { + super(null, clen); + + //handle column list + int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, colnames, TfUtils.TXMETHOD_IMPUTE); + initColList(collist); + + //handle method list + parseMethodsAndReplacments(parsedSpec); + + //create reuse histograms + _hist = new HashMap<Integer, HashMap<String,Long>>(); + } + + public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, String[] NAstrings, int clen) + throws JSONException + { + super(null, clen); + boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE); + boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE); + _NAstrings = NAstrings; + + if(!isMV) { + // MV Impute is not applicable + _colList = null; + _mvMethodList = null; + _meanList = null; + _countList = null; + _replacementList = null; + } + else { + JSONObject mvobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE); + JSONArray mvattrs = (JSONArray) mvobj.get(TfUtils.JSON_ATTRS); + JSONArray mvmthds = (JSONArray) mvobj.get(TfUtils.JSON_MTHD); + int mvLength = mvattrs.size(); + + _colList = new int[mvLength]; + _mvMethodList = new MVMethod[mvLength]; + + _meanList = new KahanObject[mvLength]; + _countList = new long[mvLength]; + _varList = new CM_COV_Object[mvLength]; + + _isMVScaled = new BitSet(_colList.length); + _isMVScaled.clear(); + + for(int i=0; i < _colList.length; i++) { + _colList[i] = UtilFunctions.toInt(mvattrs.get(i)); + _mvMethodList[i] = MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))]; + _meanList[i] = new KahanObject(0, 0); + } + + _replacementList = new String[mvLength]; // contains replacements for all columns (scale and categorical) + + JSONArray constants = (JSONArray)mvobj.get(TfUtils.JSON_CONSTS); + for(int i=0; i < constants.size(); i++) { + if ( constants.get(i) == null ) + _replacementList[i] = "NaN"; + else + _replacementList[i] = constants.get(i).toString(); + } + } + + // Handle scaled attributes + if ( !isSC ) + { + // scaling is not applicable + _scnomvCountList = null; + _scnomvMeanList = null; + _scnomvVarList = null; + } + else + { + if ( _colList != null ) + _mvscMethodList = new MVMethod[_colList.length]; + + JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE); + JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS); + JSONArray scmthds = (JSONArray) scobj.get(TfUtils.JSON_MTHD); + int scLength = scattrs.size(); + + int[] _allscaled = new int[scLength]; + int scnomv = 0, colID; + byte mthd; + for(int i=0; i < scLength; i++) + { + colID = UtilFunctions.toInt(scattrs.get(i)); + mthd = (byte) UtilFunctions.toInt(scmthds.get(i)); + + _allscaled[i] = colID; + + // check if the attribute is also MV imputed + int mvidx = isApplicable(colID); + if(mvidx != -1) + { + _isMVScaled.set(mvidx); + _mvscMethodList[mvidx] = MVMethod.values()[mthd]; + _varList[mvidx] = new CM_COV_Object(); + } + else + scnomv++; // count of scaled but not imputed + } + + if(scnomv > 0) + { + _scnomvList = new int[scnomv]; + _scnomvMethodList = new MVMethod[scnomv]; + + _scnomvMeanList = new KahanObject[scnomv]; + _scnomvCountList = new long[scnomv]; + _scnomvVarList = new CM_COV_Object[scnomv]; + + for(int i=0, idx=0; i < scLength; i++) + { + colID = UtilFunctions.toInt(scattrs.get(i)); + mthd = (byte)UtilFunctions.toInt(scmthds.get(i)); + + if(isApplicable(colID) == -1) + { // scaled but not imputed + _scnomvList[idx] = colID; + _scnomvMethodList[idx] = MVMethod.values()[mthd]; + _scnomvMeanList[idx] = new KahanObject(0, 0); + _scnomvVarList[idx] = new CM_COV_Object(); + idx++; + } + } + } + } + } + + private void parseMethodsAndReplacments(JSONObject parsedSpec) throws JSONException { + JSONArray mvspec = (JSONArray) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE); + _mvMethodList = new MVMethod[mvspec.size()]; + _replacementList = new String[mvspec.size()]; + _meanList = new KahanObject[mvspec.size()]; + _countList = new long[mvspec.size()]; + for(int i=0; i < mvspec.size(); i++) { + JSONObject mvobj = (JSONObject)mvspec.get(i); + _mvMethodList[i] = MVMethod.valueOf(mvobj.get("method").toString().toUpperCase()); + if( _mvMethodList[i] == MVMethod.CONSTANT ) { + _replacementList[i] = mvobj.getString("value").toString(); + } + _meanList[i] = new KahanObject(0, 0); + } + } + + public void prepare(String[] words) throws IOException { + + try { + String w = null; + if(_colList != null) + for(int i=0; i <_colList.length; i++) { + int colID = _colList[i]; + w = UtilFunctions.unquote(words[colID-1].trim()); + + try { + if(!TfUtils.isNA(_NAstrings, w)) { + _countList[i]++; + + boolean computeMean = (_mvMethodList[i] == MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) ); + if(computeMean) { + // global_mean + double d = UtilFunctions.parseToDouble(w); + _meanFn.execute2(_meanList[i], d, _countList[i]); + + if (_isMVScaled.get(i) && _mvscMethodList[i] == MVMethod.GLOBAL_MODE) + _varFn.execute(_varList[i], d); + } + else { + // global_mode or constant + // Nothing to do here. Mode is computed using recode maps. + } + } + } catch (NumberFormatException e) + { + throw new RuntimeException("Encountered \"" + w + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + w + "\" to na.strings, along with an appropriate imputation method."); + } + } + + // Compute mean and variance for attributes that are scaled but not imputed + if(_scnomvList != null) + for(int i=0; i < _scnomvList.length; i++) + { + int colID = _scnomvList[i]; + w = UtilFunctions.unquote(words[colID-1].trim()); + double d = UtilFunctions.parseToDouble(w); + _scnomvCountList[i]++; // not required, this is always equal to total #records processed + _meanFn.execute2(_scnomvMeanList[i], d, _scnomvCountList[i]); + if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE) + _varFn.execute(_scnomvVarList[i], d); + } + } catch(Exception e) { + throw new IOException(e); + } + } + + public MVMethod getMethod(int colID) { + int idx = isApplicable(colID); + if(idx == -1) + return MVMethod.INVALID; + else + return _mvMethodList[idx]; + } + + public long getNonMVCount(int colID) { + int idx = isApplicable(colID); + return (idx == -1) ? 0 : _countList[idx]; + } + + public String getReplacement(int colID) { + int idx = isApplicable(colID); + return (idx == -1) ? null : _replacementList[idx]; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + build(in); + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + try { + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) { + //compute global column mean (scale) + long off = _countList[j]; + for( int i=0; i<in.getNumRows(); i++ ) + _meanFn.execute2(_meanList[j], UtilFunctions.objectToDouble( + in.getSchema()[colID-1], in.get(i, colID-1)), off+i+1); + _replacementList[j] = String.valueOf(_meanList[j]._sum); + _countList[j] += in.getNumRows(); + } + else if( _mvMethodList[j] == MVMethod.GLOBAL_MODE ) { + //compute global column mode (categorical), i.e., most frequent category + HashMap<String,Long> hist = _hist.containsKey(colID) ? + _hist.get(colID) : new HashMap<String,Long>(); + for( int i=0; i<in.getNumRows(); i++ ) { + String key = String.valueOf(in.get(i, colID-1)); + if( key != null && !key.isEmpty() ) { + Long val = hist.get(key); + hist.put(key, (val!=null) ? val+1 : 1); + } + } + _hist.put(colID, hist); + long max = Long.MIN_VALUE; + for( Entry<String, Long> e : hist.entrySet() ) + if( e.getValue() > max ) { + _replacementList[j] = e.getKey(); + max = e.getValue(); + } + } + } + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public String[] apply(String[] words) + { + if( isApplicable() ) + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; + String w = UtilFunctions.unquote(words[colID-1]); + if(TfUtils.isNA(_NAstrings, w)) + w = words[colID-1] = _replacementList[i]; + + if ( _isMVScaled.get(i) ) + if ( _mvscMethodList[i] == MVMethod.GLOBAL_MEAN ) + words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum ); + else + words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum ); + } + + if(_scnomvList != null) + for(int i=0; i < _scnomvList.length; i++) + { + int colID = _scnomvList[i]; + if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN ) + words[colID-1] = Double.toString( UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum ); + else + words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / _scnomvVarList[i].mean._sum ); + } + + return words; + } + + @Override + public MatrixBlock apply(FrameBlock in, MatrixBlock out) { + for(int i=0; i<in.getNumRows(); i++) { + for(int j=0; j<_colList.length; j++) { + int colID = _colList[j]; + if( Double.isNaN(out.quickGetValue(i, colID-1)) ) + out.quickSetValue(i, colID-1, Double.parseDouble(_replacementList[j])); + } + } + return out; + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + for( int j=0; j<_colList.length; j++ ) { + out.getColumnMetadata(_colList[j]-1) + .setMvValue(_replacementList[j]); + } + return out; + } + + public void initMetaData(FrameBlock meta) { + //init replacement lists, replace recoded values to + //apply mv imputation potentially after recoding + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); + if( _rcList.contains(colID) ) { + Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal); + if( mvVal2 == null) + throw new RuntimeException("Missing recode value for impute value '"+mvVal+"' (colID="+colID+")."); + _replacementList[j] = mvVal2.toString(); + } + else { + _replacementList[j] = mvVal; + } + } + } + + public void initRecodeIDList(List<Integer> rcList) { + _rcList = rcList; + } + + /** + * Exposes the internal histogram after build. + * + * @param colID column ID + * @return histogram (map of string keys and long values) + */ + public HashMap<String,Long> getHistogram( int colID ) { + return _hist.get(colID); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java new file mode 100644 index 0000000..af09cee --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class EncoderOmit extends Encoder +{ + private static final long serialVersionUID = 1978852120416654195L; + + private int _rmRows = 0; + + public EncoderOmit(JSONObject parsedSpec, String[] colnames, int clen) + throws JSONException + { + super(null, clen); + if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT)) + return; + int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_OMIT); + initColList(collist); + } + + public int getNumRemovedRows() { + return _rmRows; + } + + public boolean omit(String[] words, TfUtils agents) + { + if( !isApplicable() ) + return false; + + for(int i=0; i<_colList.length; i++) { + int colID = _colList[i]; + if(TfUtils.isNA(agents.getNAStrings(),UtilFunctions.unquote(words[colID-1].trim()))) + return true; + } + return false; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + //do nothing + } + + @Override + public String[] apply(String[] words) { + return null; + } + + @Override + public MatrixBlock apply(FrameBlock in, MatrixBlock out) + { + //determine output size + int numRows = 0; + for(int i=0; i<out.getNumRows(); i++) { + boolean valid = true; + for(int j=0; j<_colList.length; j++) + valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1)); + numRows += valid ? 1 : 0; + } + + //copy over valid rows into the output + MatrixBlock ret = new MatrixBlock(numRows, out.getNumColumns(), false); + int pos = 0; + for(int i=0; i<in.getNumRows(); i++) { + //determine if valid row or omit + boolean valid = true; + for(int j=0; j<_colList.length; j++) + valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1)); + //copy row if necessary + if( valid ) { + for(int j=0; j<out.getNumColumns(); j++) + ret.quickSetValue(pos, j, out.quickGetValue(i, j)); + pos++; + } + } + + //keep info an remove rows + _rmRows = out.getNumRows() - pos; + + return ret; + } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + //do nothing + return out; + } + + @Override + public void initMetaData(FrameBlock meta) { + //do nothing + } +} + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java index 08722fd..d84ea0d 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java @@ -19,19 +19,10 @@ package org.apache.sysml.runtime.transform.encode; -import java.io.IOException; -import java.util.Iterator; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.transform.DistinctValue; -import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.UtilFunctions; /** @@ -89,20 +80,4 @@ public class EncoderPassThrough extends Encoder public void initMetaData(FrameBlock meta) { //do nothing } - - - @Override - public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - throw new RuntimeException("File-based api not supported."); - } - - @Override - public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { - throw new RuntimeException("File-based api not supported."); - } - - @Override - public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - throw new RuntimeException("File-based api not supported."); - } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java new file mode 100644 index 0000000..bb8592c --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.sysml.lops.Lop; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +public class EncoderRecode extends Encoder +{ + private static final long serialVersionUID = 8213163881283341874L; + + private int[] _mvrcdList = null; + private int[] _fullrcdList = null; + + //recode maps and custom map for partial recode maps + private HashMap<Integer, HashMap<String, Long>> _rcdMaps = new HashMap<Integer, HashMap<String, Long>>(); + private HashMap<Integer, HashMap<String,String>> _finalMaps = null; + private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null; + + public EncoderRecode(JSONObject parsedSpec, String[] colnames, int clen) + throws JSONException + { + super(null, clen); + int rcdCount = 0; + + if( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) { + int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_RECODE); + rcdCount = initColList(collist); + } + + if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) { + _mvrcdList = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_MVRCD); + rcdCount += _mvrcdList.length; + } + + if ( rcdCount > 0 ) { + _fullrcdList = new int[rcdCount]; + int idx = -1; + if(_colList != null) + for(int i=0; i < _colList.length; i++) + _fullrcdList[++idx] = _colList[i]; + + if(_mvrcdList != null) + for(int i=0; i < _mvrcdList.length; i++) + _fullrcdList[++idx] = _mvrcdList[i]; + } + } + + public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { + return _rcdMaps; + } + + public HashMap<Integer, HashSet<Object>> getCPRecodeMapsPartial() { + return _rcdMapsPart; + } + + public HashMap<Integer, HashMap<String,String>> getRecodeMaps() { + return _finalMaps; + } + + private String lookupRCDMap(int colID, String key) { + if( _finalMaps!=null ) + return _finalMaps.get(colID).get(key); + else { //used for cp + Long tmp = _rcdMaps.get(colID).get(key); + return (tmp!=null) ? Long.toString(tmp) : null; + } + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + if( !isApplicable() ) + return out; + + //build and apply recode maps + build(in); + apply(in, out); + + return out; + } + + @Override + public void build(FrameBlock in) { + if( !isApplicable() ) + return; + + Iterator<String[]> iter = in.getStringRowIterator(); + while( iter.hasNext() ) { + String[] row = iter.next(); + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + //allocate column map if necessary + if( !_rcdMaps.containsKey(colID) ) + _rcdMaps.put(colID, new HashMap<String,Long>()); + //probe and build column map + HashMap<String,Long> map = _rcdMaps.get(colID); + String key = row[colID-1]; + if( key!=null && !key.isEmpty() && !map.containsKey(key) ) + map.put(key, Long.valueOf(map.size()+1)); + } + } + } + + public void buildPartial(FrameBlock in) { + if( !isApplicable() ) + return; + + //ensure allocated partial recode map + if( _rcdMapsPart == null ) + _rcdMapsPart = new HashMap<Integer, HashSet<Object>>(); + + //construct partial recode map (tokens w/o codes) + //iterate over columns for sequential access + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + //allocate column map if necessary + if( !_rcdMapsPart.containsKey(colID) ) + _rcdMapsPart.put(colID, new HashSet<Object>()); + HashSet<Object> map = _rcdMapsPart.get(colID); + //probe and build column map + for( int i=0; i<in.getNumRows(); i++ ) + map.add(in.get(i, colID-1)); + //cleanup unnecessary entries once + map.remove(null); + map.remove(""); + } + } + + /** + * Method to apply transformations. + */ + @Override + public String[] apply(String[] words) + { + if( !isApplicable() ) + return words; + + //apply recode maps on relevant columns of given row + for(int i=0; i < _colList.length; i++) { + //prepare input and get code + int colID = _colList[i]; + String key = UtilFunctions.unquote(words[colID-1].trim()); + String val = lookupRCDMap(colID, key); + // replace unseen keys with NaN + words[colID-1] = (val!=null) ? val : "NaN"; + } + + return words; + } + + @Override + public MatrixBlock apply(FrameBlock in, MatrixBlock out) { + //apply recode maps column wise + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + for( int i=0; i<in.getNumRows(); i++ ) { + Object okey = in.get(i, colID-1); + String key = (okey!=null) ? okey.toString() : null; + String val = lookupRCDMap(colID, key); + out.quickSetValue(i, colID-1, (val!=null) ? + Double.parseDouble(val) : Double.NaN); + } + } + + return out; + } + + @Override + public FrameBlock getMetaData(FrameBlock meta) { + if( !isApplicable() ) + return meta; + + //inverse operation to initRecodeMaps + + //allocate output rows + int maxDistinct = 0; + for( int j=0; j<_colList.length; j++ ) + if( _rcdMaps.containsKey(_colList[j]) ) + maxDistinct = Math.max(maxDistinct, _rcdMaps.get(_colList[j]).size()); + meta.ensureAllocatedColumns(maxDistinct); + + //create compact meta data representation + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + int rowID = 0; + if( _rcdMaps.containsKey(_colList[j]) ) + for( Entry<String, Long> e : _rcdMaps.get(colID).entrySet() ) { + String tmp = constructRecodeMapEntry(e.getKey(), e.getValue()); + meta.set(rowID++, colID-1, tmp); + } + meta.getColumnMetadata(colID-1).setNumDistinct( + _rcdMaps.get(colID).size()); + } + + return meta; + } + + + /** + * Construct the recodemaps from the given input frame for all + * columns registered for recode. + * + * @param meta frame block + */ + public void initMetaData( FrameBlock meta ) { + if( meta == null || meta.getNumRows()<=0 ) + return; + + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + _rcdMaps.put(colID, meta.getRecodeMap(colID-1)); + } + } + + /** + * Returns the Recode map entry which consists of concatenation of code, delimiter and token. + * @param token is part of Recode map + * @param code is code for token + * @return the concatenation of code and token with delimiter in between + */ + public static String constructRecodeMapEntry(String token, Long code) { + return token + Lop.DATATYPE_PREFIX + code.toString(); + } +} + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java index 62b90b4..afb7ee9 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java @@ -50,7 +50,6 @@ import org.apache.wink.json4j.JSONObject; public class TfMetaUtils { - public static boolean isIDSpecification(String spec) throws DMLRuntimeException { try { JSONObject jSpec = new JSONObject(spec); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java index af2e75f..b506444 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java @@ -88,9 +88,7 @@ public class FrameFunctionTest extends AutomatedTestBase DMLScript.USE_LOCAL_SPARK_CONFIG = true; boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS; - boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA; - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; try { @@ -126,7 +124,6 @@ public class FrameFunctionTest extends AutomatedTestBase rtplatform = platformOld; DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA; - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java index ecc958b..c629eee 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; -import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.io.FrameWriter; @@ -201,10 +200,6 @@ public class FrameMatrixReblockTest extends AutomatedTestBase if( rtplatform == RUNTIME_PLATFORM.SPARK ) DMLScript.USE_LOCAL_SPARK_CONFIG = true; - boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; - if( ofmt.equals("csv") ) - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; - try { int cols = multColBlks ? cols2 : cols1; @@ -235,7 +230,6 @@ public class FrameMatrixReblockTest extends AutomatedTestBase finally { rtplatform = platformOld; DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java index 5066582..ceeec07 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java @@ -21,7 +21,6 @@ package org.apache.sysml.test.integration.functions.frame; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; -import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.io.FrameReaderFactory; @@ -101,10 +100,6 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase DMLScript.USE_LOCAL_SPARK_CONFIG = true; String ofmt = OutputInfo.outputInfoToStringExternal(oinfo); - - boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; - if( ofmt.equals("csv") ) - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; try { @@ -148,7 +143,6 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase finally { rtplatform = platformOld; DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java index 35078f3..056e619 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java @@ -22,7 +22,6 @@ package org.apache.sysml.test.integration.functions.transform; import org.junit.Test; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; -import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; @@ -75,7 +74,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase { //set runtime platform RUNTIME_PLATFORM rtold = rtplatform; - boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; rtplatform = rt; boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; @@ -94,7 +92,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase programArgs = new String[]{"-explain","-args", HOME + "input/" + DATASET, output("R") }; - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; runTest(true, false, null, -1); //read input/output and compare @@ -113,7 +110,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase finally { rtplatform = rtold; DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java deleted file mode 100644 index 81c0bab..0000000 --- a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.test.integration.functions.transform; - -import java.io.IOException; - -import org.junit.Test; - -import org.apache.sysml.api.DMLScript; -import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.test.integration.AutomatedTestBase; -import org.apache.sysml.test.integration.TestConfiguration; -import org.apache.sysml.test.utils.TestUtils; - -/** - * - * - */ -public class RunTest extends AutomatedTestBase -{ - - private final static String TEST_NAME1 = "Transform"; - private final static String TEST_NAME2 = "Apply"; - private final static String TEST_DIR = "functions/transform/"; - private final static String TEST_CLASS_DIR = TEST_DIR + RunTest.class.getSimpleName() + "/"; - - private final static String HOMES_DATASET = "homes/homes.csv"; - //private final static String HOMES_SPEC = "homes/homes.tfspec.json"; - private final static String HOMES_SPEC2 = "homes/homes.tfspec2.json"; - //private final static String HOMES_IDSPEC = "homes/homes.tfidspec.json"; - //private final static String HOMES_TFDATA = "homes/homes.transformed.csv"; - //private final static String HOMES_COLNAMES = "homes/homes.csv.colnames"; - - private final static String HOMES_NAN_DATASET = "homes/homesNAN.csv"; - private final static String HOMES_NAN_SPEC = "homes/homesNAN.tfspec.json"; - //private final static String HOMES_NAN_IDSPEC = "homes/homesNAN.tfidspec.json"; - private final static String HOMES_NAN_COLNAMES = "homes/homesNAN.colnames.csv"; - - private final static String HOMES_MISSING_DATASET = "homes/homesAllMissing.csv"; - private final static String HOMES_MISSING_SPEC = "homes/homesAllMissing.tfspec.json"; - private final static String HOMES_MISSING_IDSPEC = "homes/homesAllMissing.tfidspec.json"; - - @Override - public void setUp() - { - TestUtils.clearAssertionInformation(); - addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"})); - } - - // ---- NAN BinaryBlock ---- - - @Test - public void runTestWithNAN_HybridBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "binary"); - } - - @Test - public void runTestWithNAN_SPHybridBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary"); - } - - @Test - public void runTestWithNAN_HadoopBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "binary"); - } - - @Test - public void runTestWithNAN_SparkBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "binary"); - } - - // ---- NAN CSV ---- - - @Test - public void runTestWithNAN_HybridCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "csv"); - } - - @Test - public void runTestWithNAN_SPHybridCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv"); - } - - @Test - public void runTestWithNAN_HadoopCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "csv"); - } - - @Test - public void runTestWithNAN_SparkCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "csv"); - } - - // ---- Test2 BinaryBlock ---- - - @Test - public void runTest2_HybridBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID, "binary"); - } - - @Test - public void runTest2_SPHybridBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary"); - } - - @Test - public void runTest2_HadoopBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HADOOP, "binary"); - } - - @Test - public void runTest2_SparkBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.SPARK, "binary"); - } - - // ---- Test2 CSV ---- - - @Test - public void runTest2_HybridCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID, "csv"); - } - - @Test - public void runTest2_SPHybridCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv"); - } - - @Test - public void runTest2_HadoopCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.HADOOP, "csv"); - } - - @Test - public void runTest2_SparkCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, RUNTIME_PLATFORM.SPARK, "csv"); - } - - // ---- HomesMissing BinaryBlock ---- - - @Test - public void runAllMissing_HybridBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HYBRID, "binary"); - } - - @Test - public void runAllMissing_SPHybridBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "binary"); - } - - @Test - public void runAllMissing_HadoopBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.HADOOP, "binary"); - } - - @Test - public void runAllMissing_SparkBB() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, null, true, RUNTIME_PLATFORM.SPARK, "binary"); - } - - // ---- HomesMissing CSV ---- - - @Test - public void runAllMissing_HybridCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HYBRID, "csv"); - } - - @Test - public void runAllMissing_SPHybridCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "csv"); - } - - @Test - public void runAllMissing_HadoopCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.HADOOP, "csv"); - } - - @Test - public void runAllMissing_SparkCSV() throws DMLRuntimeException, IOException { - runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, null, true, RUNTIME_PLATFORM.SPARK, "csv"); - } - - // ------------------ - - /** - * - * @param sparseM1 - * @param sparseM2 - * @param instType - * @throws IOException - * @throws DMLRuntimeException - */ - private void runScalingTest( String dataset, String spec, String colnames, boolean exception, RUNTIME_PLATFORM rt, String ofmt) throws IOException, DMLRuntimeException - { - RUNTIME_PLATFORM platformOld = rtplatform; - rtplatform = rt; - - boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; - if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK) - DMLScript.USE_LOCAL_SPARK_CONFIG = true; - - try - { - getAndLoadTestConfiguration(TEST_NAME1); - - /* This is for running the junit test the new way, i.e., construct the arguments directly */ - String HOME = SCRIPT_DIR + TEST_DIR; - fullDMLScriptName = null; - - if (colnames == null) { - fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; - programArgs = new String[]{"-nvargs", - "DATA=" + HOME + "input/" + dataset, - "TFSPEC=" + HOME + "input/" + spec, - "TFMTD=" + output("tfmtd"), - "TFDATA=" + output("tfout"), - "OFMT=" + ofmt }; - } - else { - fullDMLScriptName = HOME + TEST_NAME1 + "_colnames.dml"; - programArgs = new String[]{"-nvargs", - "DATA=" + HOME + "input/" + dataset, - "TFSPEC=" + HOME + "input/" + spec, - "COLNAMES=" + HOME + "input/" + colnames, - "TFMTD=" + output("tfmtd"), - "TFDATA=" + output("tfout"), - "OFMT=" + ofmt }; - } - - boolean exceptionExpected = exception; - runTest(true, exceptionExpected, null, -1); - - fullDMLScriptName = HOME + TEST_NAME2 + ".dml"; - programArgs = new String[]{"-nvargs", - "DATA=" + HOME + "input/" + dataset, - "APPLYMTD=" + output("tfmtd"), // generated above - "TFMTD=" + output("test_tfmtd"), - "TFDATA=" + output("test_tfout"), - "OFMT=" + ofmt }; - - exceptionExpected = exception; - runTest(true, exceptionExpected, null, -1); - - } - finally - { - rtplatform = platformOld; - DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; - } - } -} \ No newline at end of file
