[SYSTEMML-613] Refactoring transform encoders (api, simplify, cleanup) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/d9e0748b Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/d9e0748b Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/d9e0748b
Branch: refs/heads/master Commit: d9e0748b895f6ef7d8f22542c0630036e12b596f Parents: c8c8d81 Author: Matthias Boehm <[email protected]> Authored: Mon Apr 4 20:54:51 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Apr 5 00:18:17 2016 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/api/jmlc/Connection.java | 12 +- .../runtime/transform/ApplyTfCSVSPARK.java | 1 - .../sysml/runtime/transform/BinAgent.java | 147 ++++++-------- .../sysml/runtime/transform/DataTransform.java | 83 ++++---- .../sysml/runtime/transform/DistinctValue.java | 31 ++- .../sysml/runtime/transform/DummycodeAgent.java | 138 +++++--------- .../sysml/runtime/transform/MVImputeAgent.java | 191 ++++++++----------- .../sysml/runtime/transform/OmitAgent.java | 71 ++----- .../sysml/runtime/transform/RecodeAgent.java | 154 ++++++--------- .../apache/sysml/runtime/transform/TfUtils.java | 114 ++++++----- .../runtime/transform/TransformationAgent.java | 87 --------- .../sysml/runtime/transform/decode/Decoder.java | 3 +- .../transform/decode/DecoderComposite.java | 3 +- .../transform/decode/DecoderFactory.java | 13 +- .../runtime/transform/decode/DecoderRecode.java | 3 +- .../sysml/runtime/transform/encode/Encoder.java | 104 ++++++++++ .../functions/transform/ScalingTest.java | 5 +- 17 files changed, 501 insertions(+), 659 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/api/jmlc/Connection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java index abb22ed..00477cf 100644 --- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java +++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java @@ -63,8 +63,6 @@ import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.TfUtils; -import org.apache.sysml.runtime.transform.TransformationAgent; -import org.apache.sysml.runtime.transform.TransformationAgent.TX_METHOD; import org.apache.sysml.runtime.transform.decode.DecoderRecode; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.MapReduceTool; @@ -462,14 +460,14 @@ public class Connection //parse json transform specification JSONObject jSpec = new JSONObject(spec); - if ( jSpec.containsKey(TX_METHOD.RECODE.toString())) { + if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { JSONArray attrs = null; //TODO simplify once json spec consolidated - if( jSpec.get(TX_METHOD.RECODE.toString()) instanceof JSONObject ) { - JSONObject obj = (JSONObject) jSpec.get(TX_METHOD.RECODE.toString()); - attrs = (JSONArray) obj.get(TransformationAgent.JSON_ATTRS); + if( jSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { + JSONObject obj = (JSONObject) jSpec.get(TfUtils.TXMETHOD_RECODE); + attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); } else - attrs = (JSONArray)jSpec.get(TX_METHOD.RECODE.toString()); + attrs = (JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE); for(int j=0; j<attrs.length(); j++) specRecodeIDs.add(UtilFunctions.toInt(attrs.get(j))); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java index 9b4f70c..c1d99e6 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java @@ -124,7 +124,6 @@ public class ApplyTfCSVSPARK { _tfmapper.processHeaderLine(); if (_tfmapper.hasHeader() ) { - //outLines.add(dcdHeader); // if the header needs to be preserved in the output file continue; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index 8a7199e..4f24afd 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -41,68 +41,67 @@ import org.apache.wink.json4j.JSONObject; import scala.Tuple2; import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; +import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; -public class BinAgent extends TransformationAgent { - +public class BinAgent extends Encoder +{ private static final long serialVersionUID = 1917445005206076078L; public static final String MIN_PREFIX = "min"; public static final String MAX_PREFIX = "max"; public static final String NBINS_PREFIX = "nbins"; - private int[] _binList = null; - //private byte[] _binMethodList = null; // Not used, since only equi-width is supported for now. private int[] _numBins = null; - private double[] _min=null, _max=null; // min and max among non-missing values - private double[] _binWidths = null; // width of a bin for each attribute - BinAgent() { } + public BinAgent() { + super( null ); + } - BinAgent(JSONObject parsedSpec) throws JSONException { - - if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) ) + public BinAgent(JSONObject parsedSpec) + throws JSONException + { + super( null ); + if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) ) return; - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString()); - - JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); - //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD); - JSONArray nbins = (JSONArray) obj.get(JSON_NBINS); + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN); + JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); + JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS); - assert(attrs.size() == nbins.size()); - - _binList = new int[attrs.size()]; + initColList(attrs); _numBins = new int[attrs.size()]; - for(int i=0; i < _binList.length; i++) { - _binList[i] = UtilFunctions.toInt(attrs.get(i)); + for(int i=0; i < _numBins.length; i++) _numBins[i] = UtilFunctions.toInt(nbins.get(i)); - } // initialize internal transformation metadata - _min = new double[_binList.length]; + _min = new double[_colList.length]; Arrays.fill(_min, Double.MAX_VALUE); - _max = new double[_binList.length]; + _max = new double[_colList.length]; Arrays.fill(_max, -Double.MAX_VALUE); - _binWidths = new double[_binList.length]; + _binWidths = new double[_colList.length]; } + + public int[] getNumBins() { return _numBins; } + public double[] getMin() { return _min; } + public double[] getBinWidths() { return _binWidths; } public void prepare(String[] words, TfUtils agents) { - if ( _binList == null ) + if ( !isApplicable() ) return; - for(int i=0; i <_binList.length; i++) { - int colID = _binList[i]; + for(int i=0; i <_colList.length; i++) { + int colID = _colList[i]; String w = null; double d = 0; // equi-width w = UtilFunctions.unquote(words[colID-1].trim()); - if(!agents.isNA(w)) { + if(!TfUtils.isNA(agents.getNAStrings(),w)) { d = UtilFunctions.parseToDouble(w); if(d < _min[i]) _min[i] = d; @@ -136,12 +135,12 @@ public class BinAgent extends TransformationAgent { */ @Override public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - if ( _binList == null ) + if( !isApplicable() ) return; try { - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; IntWritable iw = new IntWritable(-colID); out.collect(iw, prepMinOutput(i)); @@ -154,12 +153,12 @@ public class BinAgent extends TransformationAgent { } public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { - if ( _binList == null ) + if ( !isApplicable() ) return list; try { - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; Integer iw = -colID; list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) ); @@ -174,7 +173,7 @@ public class BinAgent extends TransformationAgent { private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX); + Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); br.close(); @@ -225,15 +224,15 @@ public class BinAgent extends TransformationAgent { public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { - if(_binList == null) + if( !isApplicable() ) return; MVImputeAgent mvagent = agents.getMVImputeAgent(); - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; // If the column is imputed with a constant, then adjust min and max based the value of the constant. - if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) + if ( mvagent.isApplicable(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) { double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) ); if ( cst < _min[i]) @@ -249,11 +248,6 @@ public class BinAgent extends TransformationAgent { // ------------------------------------------------------------------------------------------------ - public int[] getBinList() { return _binList; } - public int[] getNumBins() { return _numBins; } - public double[] getMin() { return _min; } - public double[] getBinWidths() { return _binWidths; } - /** * Method to load transform metadata for all attributes * @@ -262,14 +256,14 @@ public class BinAgent extends TransformationAgent { */ @Override public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if ( _binList == null ) + if( !isApplicable() ) return; if(fs.isDirectory(txMtdDir)) { - for(int i=0; i<_binList.length;i++) { - int colID = _binList[i]; + for(int i=0; i<_colList.length;i++) { + int colID = _colList[i]; - Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX); + Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); @@ -300,56 +294,27 @@ public class BinAgent extends TransformationAgent { * @return */ @Override - public String[] apply(String[] words, TfUtils agents) { - if ( _binList == null ) + public String[] apply(String[] words) { + if( !isApplicable() ) return words; - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; - + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; try { - double val = UtilFunctions.parseToDouble(words[colID-1]); - int binid = 1; - double tmp = _min[i] + _binWidths[i]; - while(val > tmp && binid < _numBins[i]) { - tmp += _binWidths[i]; - binid++; - } - words[colID-1] = Integer.toString(binid); - } catch(NumberFormatException e) - { + double val = UtilFunctions.parseToDouble(words[colID-1]); + int binid = 1; + double tmp = _min[i] + _binWidths[i]; + while(val > tmp && binid < _numBins[i]) { + tmp += _binWidths[i]; + binid++; + } + words[colID-1] = Integer.toString(binid); + } + catch(NumberFormatException e) { throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method."); } } return words; } - - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isBinned(int colID) - { - if(_binList == null) - return -1; - - int idx = Arrays.binarySearch(_binList, colID); - return ( idx >= 0 ? idx : -1); - } - - - @Override - public void print() { - System.out.print("Binning List (Equi-width): \n "); - for(int i : _binList) { - System.out.print(i + " "); - } - System.out.print("\n "); - for(int b : _numBins) { - System.out.print(b + " "); - } - System.out.println(); - } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java index 16f6228..a88923f 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java @@ -78,7 +78,6 @@ import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; -import org.apache.sysml.runtime.transform.TransformationAgent.TX_METHOD; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.utils.JSONHelper; @@ -237,8 +236,8 @@ public class DataTransform // -------------------------------------------------------------------------- // Omit - if( inputSpec.containsKey(TX_METHOD.OMIT.toString()) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TX_METHOD.OMIT.toString()); + if( inputSpec.containsKey(TfUtils.TXMETHOD_OMIT) ) { + JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_OMIT); omitList = new int[arrtmp.size()]; for(int i=0; i<arrtmp.size(); i++) { if(byPositions) @@ -254,8 +253,8 @@ public class DataTransform omitList = null; // -------------------------------------------------------------------------- // Missing value imputation - if( inputSpec.containsKey(TX_METHOD.IMPUTE.toString()) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TX_METHOD.IMPUTE.toString()); + if( inputSpec.containsKey(TfUtils.TXMETHOD_IMPUTE) ) { + JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_IMPUTE); mvList = new int[arrtmp.size()]; mvMethods = new byte[arrtmp.size()]; @@ -306,8 +305,8 @@ public class DataTransform mvList = null; // -------------------------------------------------------------------------- // Recoding - if( inputSpec.containsKey(TX_METHOD.RECODE.toString()) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TX_METHOD.RECODE.toString()); + if( inputSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) { + JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_RECODE); rcdList = new int[arrtmp.size()]; for(int i=0; i<arrtmp.size(); i++) { if (byPositions) @@ -323,8 +322,8 @@ public class DataTransform rcdList = null; // -------------------------------------------------------------------------- // Binning - if( inputSpec.containsKey(TX_METHOD.BIN.toString()) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TX_METHOD.BIN.toString()); + if( inputSpec.containsKey(TfUtils.TXMETHOD_BIN) ) { + JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_BIN); binList = new int[arrtmp.size()]; binMethods = new byte[arrtmp.size()]; @@ -349,7 +348,7 @@ public class DataTransform throw new IOException("Unknown missing value imputation method (" + stmp + ") in transformation specification: " + specWithNames); binMethods[i] = btmp; - numBins[i] = entry.get(TransformationAgent.JSON_NBINS); + numBins[i] = entry.get(TfUtils.JSON_NBINS); if ( ((Integer) numBins[i]).intValue() <= 1 ) throw new IllegalArgumentException("Invalid transformation on column \"" + (String) entry.get(NAME) + "\". Number of bins must be greater than 1."); } @@ -371,8 +370,8 @@ public class DataTransform binList = null; // -------------------------------------------------------------------------- // Dummycoding - if( inputSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TX_METHOD.DUMMYCODE.toString()); + if( inputSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) { + JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_DUMMYCODE); dcdList = new int[arrtmp.size()]; for(int i=0; i<arrtmp.size(); i++) { if (byPositions) @@ -388,8 +387,8 @@ public class DataTransform dcdList = null; // -------------------------------------------------------------------------- // Scaling - if(inputSpec.containsKey(TX_METHOD.SCALE.toString()) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TX_METHOD.SCALE.toString()); + if(inputSpec.containsKey(TfUtils.TXMETHOD_SCALE) ) { + JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_SCALE); scaleList = new int[arrtmp.size()]; scaleMethods = new byte[arrtmp.size()]; @@ -535,55 +534,55 @@ public class DataTransform if (omitList != null) { JSONObject rcdSpec = new JSONObject(); - rcdSpec.put(TransformationAgent.JSON_ATTRS, toJSONArray(omitList)); - outputSpec.put(TX_METHOD.OMIT.toString(), rcdSpec); + rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(omitList)); + outputSpec.put(TfUtils.TXMETHOD_OMIT, rcdSpec); } if (mvList != null) { JSONObject mvSpec = new JSONObject(); - mvSpec.put(TransformationAgent.JSON_ATTRS, toJSONArray(mvList)); - mvSpec.put(TransformationAgent.JSON_MTHD, toJSONArray(mvMethods)); - mvSpec.put(TransformationAgent.JSON_CONSTS, toJSONArray(mvConstants)); - outputSpec.put(TX_METHOD.IMPUTE.toString(), mvSpec); + mvSpec.put(TfUtils.JSON_ATTRS, toJSONArray(mvList)); + mvSpec.put(TfUtils.JSON_MTHD, toJSONArray(mvMethods)); + mvSpec.put(TfUtils.JSON_CONSTS, toJSONArray(mvConstants)); + outputSpec.put(TfUtils.TXMETHOD_IMPUTE, mvSpec); } if (rcdList != null) { JSONObject rcdSpec = new JSONObject(); - rcdSpec.put(TransformationAgent.JSON_ATTRS, toJSONArray(rcdList)); - outputSpec.put(TX_METHOD.RECODE.toString(), rcdSpec); + rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(rcdList)); + outputSpec.put(TfUtils.TXMETHOD_RECODE, rcdSpec); } if (binList != null) { JSONObject binSpec = new JSONObject(); - binSpec.put(TransformationAgent.JSON_ATTRS, toJSONArray(binList)); - binSpec.put(TransformationAgent.JSON_MTHD, toJSONArray(binMethods)); - binSpec.put(TransformationAgent.JSON_NBINS, toJSONArray(numBins)); - outputSpec.put(TX_METHOD.BIN.toString(), binSpec); + binSpec.put(TfUtils.JSON_ATTRS, toJSONArray(binList)); + binSpec.put(TfUtils.JSON_MTHD, toJSONArray(binMethods)); + binSpec.put(TfUtils.JSON_NBINS, toJSONArray(numBins)); + outputSpec.put(TfUtils.TXMETHOD_BIN, binSpec); } if (dcdList != null) { JSONObject dcdSpec = new JSONObject(); - dcdSpec.put(TransformationAgent.JSON_ATTRS, toJSONArray(dcdList)); - outputSpec.put(TX_METHOD.DUMMYCODE.toString(), dcdSpec); + dcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(dcdList)); + outputSpec.put(TfUtils.TXMETHOD_DUMMYCODE, dcdSpec); } if (scaleList != null) { JSONObject scaleSpec = new JSONObject(); - scaleSpec.put(TransformationAgent.JSON_ATTRS, toJSONArray(scaleList)); - scaleSpec.put(TransformationAgent.JSON_MTHD, toJSONArray(scaleMethods)); - outputSpec.put(TX_METHOD.SCALE.toString(), scaleSpec); + scaleSpec.put(TfUtils.JSON_ATTRS, toJSONArray(scaleList)); + scaleSpec.put(TfUtils.JSON_MTHD, toJSONArray(scaleMethods)); + outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec); } if (mvrcdList != null) { JSONObject mvrcd = new JSONObject(); - mvrcd.put(TransformationAgent.JSON_ATTRS, toJSONArray(mvrcdList)); - outputSpec.put(TX_METHOD.MVRCD.toString(), mvrcd); + mvrcd.put(TfUtils.JSON_ATTRS, toJSONArray(mvrcdList)); + outputSpec.put(TfUtils.TXMETHOD_MVRCD, mvrcd); } // return output spec with IDs @@ -635,11 +634,11 @@ public class DataTransform MapReduceTool.renameFileOnHDFS(tmpPath + "/" + TfUtils.TXMTD_DC_COLNAMES, txMtdPath + "/" + TfUtils.TXMTD_DC_COLNAMES); MapReduceTool.renameFileOnHDFS(tmpPath + "/" + TfUtils.TXMTD_COLTYPES, txMtdPath + "/" + TfUtils.TXMTD_COLTYPES); - if ( fs.exists(new Path(tmpPath +"/Dummycode/" + TransformationAgent.DCD_FILE_NAME)) ) + if ( fs.exists(new Path(tmpPath +"/Dummycode/" + TfUtils.DCD_FILE_NAME)) ) { if ( !fs.exists( new Path(txMtdPath + "/Dummycode/") )) fs.mkdirs(new Path(txMtdPath + "/Dummycode/")); - MapReduceTool.renameFileOnHDFS( tmpPath + "/Dummycode/" + TransformationAgent.DCD_FILE_NAME, txMtdPath + "/Dummycode/" + TransformationAgent.DCD_FILE_NAME); + MapReduceTool.renameFileOnHDFS( tmpPath + "/Dummycode/" + TfUtils.DCD_FILE_NAME, txMtdPath + "/Dummycode/" + TfUtils.DCD_FILE_NAME); } } @@ -666,17 +665,17 @@ public class DataTransform br.close(); // fetch relevant attribute lists - if ( !spec.containsKey(TX_METHOD.DUMMYCODE.toString()) ) + if ( !spec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) return ret; - JSONArray dcdList = (JSONArray) ((JSONObject)spec.get(TX_METHOD.DUMMYCODE.toString())).get(TransformationAgent.JSON_ATTRS); + JSONArray dcdList = (JSONArray) ((JSONObject)spec.get(TfUtils.TXMETHOD_DUMMYCODE)).get(TfUtils.JSON_ATTRS); // look for numBins among binned columns for(Object o : dcdList) { int id = UtilFunctions.toInt(o); - Path binpath = new Path( tfMtdPath + "/Bin/" + UtilFunctions.unquote(columnNames[id-1]) + TransformationAgent.BIN_FILE_SUFFIX); + Path binpath = new Path( tfMtdPath + "/Bin/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.BIN_FILE_SUFFIX); Path rcdpath = new Path( tfMtdPath + "/Recode/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); if ( TfUtils.checkValidInputFile(fs, binpath, false ) ) @@ -1063,7 +1062,7 @@ public class DataTransform ret = new MatrixBlock(input.getNumRows(), input.getNumColumns(), false); Iterator<String[]> iter = input.getStringRowIterator(); for( int i=0; iter.hasNext(); i++ ) { - String[] tmp = agents.apply(iter.next(), true); + String[] tmp = agents.apply(iter.next()); for( int j=0; j<tmp.length; j++ ) ret.appendValue(i, j, UtilFunctions.parseToDouble(tmp[j])); } @@ -1227,7 +1226,7 @@ public class DataTransform _ba.loadTxMtd(job, fs, tmp, agents); _da.setRecodeMapsCP( _ra.getCPRecodeMaps() ); - _da.setNumBins(_ba.getBinList(), _ba.getNumBins()); + _da.setNumBins(_ba.getColList(), _ba.getNumBins()); _da.loadTxMtd(job, fs, tmp, agents); } else { @@ -1247,7 +1246,7 @@ public class DataTransform _ba.loadTxMtd(job, fs, tmp, agents); _da.setRecodeMaps( _ra.getRecodeMaps() ); - _da.setNumBins(_ba.getBinList(), _ba.getNumBins()); + _da.setNumBins(_ba.getColList(), _ba.getNumBins()); _da.loadTxMtd(job, fs, tmp, agents); } @@ -1296,7 +1295,7 @@ public class DataTransform if(!agents.omit(words)) { - words = agents.apply(words, !isApply); + words = agents.apply(words); if (isCSV) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java index 2e52657..c35e160 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java @@ -59,8 +59,7 @@ public class DistinctValue implements Writable, Serializable { _count = count; } - public DistinctValue(OffsetCount oc) throws CharacterCodingException - { + public DistinctValue(OffsetCount oc) throws CharacterCodingException { this(oc.filename + "," + oc.fileOffset, oc.count); } @@ -70,8 +69,13 @@ public class DistinctValue implements Writable, Serializable { _count = -1; } - public String getWord() { return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); } - public long getCount() { return _count; } + public String getWord() { + return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); + } + + public long getCount() { + return _count; + } @Override public void write(DataOutput out) throws IOException { @@ -85,24 +89,17 @@ public class DistinctValue implements Writable, Serializable { @Override public void readFields(DataInput in) throws IOException { // read word - int newLength = WritableUtils.readVInt(in); - _bytes = new byte[newLength]; - in.readFully(_bytes, 0, newLength); - _length = newLength; - if (_length != _bytes.length) - System.out.println("ERROR in DistinctValue.readFields()"); + _length = WritableUtils.readVInt(in); + _bytes = new byte[_length]; + in.readFully(_bytes, 0, _length); // read count _count = in.readLong(); } public OffsetCount getOffsetCount() { - OffsetCount oc = new OffsetCount(); String[] parts = getWord().split(","); - oc.filename = parts[0]; - oc.fileOffset = UtilFunctions.parseToLong(parts[1]); - oc.count = getCount(); - - return oc; + return new OffsetCount( parts[0], + UtilFunctions.parseToLong(parts[1]), + getCount() ); } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java index e623ab5..9ceb368 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java @@ -41,13 +41,13 @@ import org.apache.wink.json4j.JSONObject; import com.google.common.base.Functions; import com.google.common.collect.Ordering; +import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; -public class DummycodeAgent extends TransformationAgent { - +public class DummycodeAgent extends Encoder +{ private static final long serialVersionUID = 5832130477659116489L; - private int[] _dcdList = null; private long numCols = 0; private HashMap<Integer, HashMap<String,String>> _finalMaps = null; @@ -59,26 +59,18 @@ public class DummycodeAgent extends TransformationAgent { private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs private long _dummycodedLength = 0; // #of columns after dummycoded - DummycodeAgent(int[] list) { - _dcdList = list; + public DummycodeAgent(int[] list) { + super(list); } - DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException { + public DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException { + super(null); numCols = ncol; - if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) ) - return; - - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString()); - JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); - - _dcdList = new int[attrs.size()]; - for(int i=0; i < _dcdList.length; i++) - _dcdList[i] = UtilFunctions.toInt(attrs.get(i)); - } - - public int[] dcdList() { - return _dcdList; + if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) + return; + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_DUMMYCODE); + initColList( (JSONArray)obj.get(TfUtils.JSON_ATTRS) ); } /** @@ -137,26 +129,26 @@ public class DummycodeAgent extends TransformationAgent { public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException { // initialize all column types in the transformed data to SCALE - ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength]; + TfUtils.ColumnTypes[] ctypes = new TfUtils.ColumnTypes[(int) _dummycodedLength]; for(int i=0; i < _dummycodedLength; i++) - ctypes[i] = ColumnTypes.SCALE; + ctypes[i] = TfUtils.ColumnTypes.SCALE; _dcdColumnMap = new int[numCols]; - Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME); + Path pt=new Path(txMtdDir+"/Dummycode/" + TfUtils.DCD_FILE_NAME); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); int sum=1; int idx = 0; for(int colID=1; colID <= numCols; colID++) { - if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID ) + if ( _colList != null && idx < _colList.length && _colList[idx] == colID ) { br.write(colID + TfUtils.TXMTD_SEP + "1" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + (sum+_domainSizes[idx]-1) + "\n"); _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1; for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++) - ctypes[i-1] = ColumnTypes.DUMMYCODED; + ctypes[i-1] = TfUtils.ColumnTypes.DUMMYCODED; sum += _domainSizes[idx]; idx++; @@ -166,11 +158,11 @@ public class DummycodeAgent extends TransformationAgent { br.write(colID + TfUtils.TXMTD_SEP + "0" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + sum + "\n"); _dcdColumnMap[colID-1] = sum-1; - if ( agents.getBinAgent().isBinned(colID) != -1 ) - ctypes[sum-1] = ColumnTypes.ORDINAL; // binned variable results in an ordinal column + if ( agents.getBinAgent().isApplicable(colID) != -1 ) + ctypes[sum-1] = TfUtils.ColumnTypes.ORDINAL; // binned variable results in an ordinal column - if ( agents.getRecodeAgent().isRecoded(colID) != -1 ) - ctypes[sum-1] = ColumnTypes.NOMINAL; + if ( agents.getRecodeAgent().isApplicable(colID) != -1 ) + ctypes[sum-1] = TfUtils.ColumnTypes.NOMINAL; sum += 1; } @@ -181,9 +173,9 @@ public class DummycodeAgent extends TransformationAgent { pt=new Path(txMtdDir + File.separator + TfUtils.TXMTD_COLTYPES); br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(columnTypeToID(ctypes[0]) + ""); + br.write(ctypes[0].toID() + ""); for(int i = 1; i < _dummycodedLength; i++) - br.write( TfUtils.TXMTD_SEP + columnTypeToID(ctypes[i])); + br.write( TfUtils.TXMTD_SEP + ctypes[i].toID() ); br.close(); return sum-1; @@ -211,7 +203,7 @@ public class DummycodeAgent extends TransformationAgent { public String constructDummycodedHeader(String header, Pattern delim) { - if(_dcdList == null && _binList == null ) + if(_colList == null && _binList == null ) // none of the columns are dummycoded, simply return the given header return header; @@ -223,11 +215,11 @@ public class DummycodeAgent extends TransformationAgent { // Dummycoding can be performed on either on a recoded column or on a binned column // process recoded columns - if(_finalMapsCP != null && _dcdList != null) + if(_finalMapsCP != null && _colList != null) { - for(int i=0; i <_dcdList.length; i++) + for(int i=0; i <_colList.length; i++) { - int colID = _dcdList[i]; + int colID = _colList[i]; HashMap<String,Long> map = _finalMapsCP.get(colID); String colName = UtilFunctions.unquote(names[colID-1]); @@ -242,17 +234,17 @@ public class DummycodeAgent extends TransformationAgent { for(int idx=0; idx < newNames.size(); idx++) { if(idx==0) - sb.append( colName + DCD_NAME_SEP + newNames.get(idx)); + sb.append( colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); else - sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx)); + sb.append( delim + colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); } names[colID-1] = sb.toString(); // replace original column name with dcd name } } } - else if(_finalMaps != null && _dcdList != null) { - for(int i=0; i <_dcdList.length; i++) { - int colID = _dcdList[i]; + else if(_finalMaps != null && _colList != null) { + for(int i=0; i <_colList.length; i++) { + int colID = _colList[i]; HashMap<String,String> map = _finalMaps.get(colID); String colName = UtilFunctions.unquote(names[colID-1]); @@ -272,9 +264,9 @@ public class DummycodeAgent extends TransformationAgent { for(int idx=0; idx < newNames.size(); idx++) { if(idx==0) - sb.append( colName + DCD_NAME_SEP + newNames.get(idx)); + sb.append( colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); else - sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx)); + sb.append( delim + colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); } names[colID-1] = sb.toString(); // replace original column name with dcd name } @@ -288,7 +280,7 @@ public class DummycodeAgent extends TransformationAgent { int colID = _binList[i]; // need to consider only binned and dummycoded columns - if(isDummyCoded(colID) == -1) + if(isApplicable(colID) == -1) continue; int numBins = _numBins[i]; @@ -297,9 +289,9 @@ public class DummycodeAgent extends TransformationAgent { sb.setLength(0); for(int idx=0; idx < numBins; idx++) if(idx==0) - sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) ); + sb.append( colName + TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) ); else - sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) ); + sb.append( delim + colName + TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) ); names[colID-1] = sb.toString(); // replace original column name with dcd name } @@ -319,21 +311,20 @@ public class DummycodeAgent extends TransformationAgent { @Override public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if ( _dcdList == null ) - { + if ( !isApplicable() ) { _dummycodedLength = numCols; return; } // sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function. - Arrays.sort(_dcdList); - _domainSizes = new int[_dcdList.length]; + Arrays.sort(_colList); + _domainSizes = new int[_colList.length]; _dummycodedLength = numCols; //HashMap<String, String> map = null; - for(int i=0; i<_dcdList.length; i++) { - int colID = _dcdList[i]; + for(int i=0; i<_colList.length; i++) { + int colID = _colList[i]; // Find the domain size for colID using _finalMaps or _finalMapsCP int domainSize = 0; @@ -372,26 +363,26 @@ public class DummycodeAgent extends TransformationAgent { * @return */ @Override - public String[] apply(String[] words, TfUtils agents) { - - if ( _dcdList == null ) + public String[] apply(String[] words) + { + if( !isApplicable() ) return words; String[] nwords = new String[(int)_dummycodedLength]; - int rcdVal = 0; for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) { - if(idx < _dcdList.length && colID==_dcdList[idx]) { + if(idx < _colList.length && colID==_colList[idx]) { // dummycoded columns try { - rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1])); - nwords[ ncolID-1+rcdVal-1 ] = "1"; - ncolID += _domainSizes[idx]; - idx++; - } catch (Exception e) { - System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength); - throw new RuntimeException(e); + rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1])); + nwords[ ncolID-1+rcdVal-1 ] = "1"; + ncolID += _domainSizes[idx]; + idx++; + } + catch (Exception e) { + throw new RuntimeException("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength); } } else { @@ -402,27 +393,4 @@ public class DummycodeAgent extends TransformationAgent { return nwords; } - - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isDummyCoded(int colID) - { - if(_dcdList == null) - return -1; - - int idx = Arrays.binarySearch(_dcdList, colID); - return ( idx >= 0 ? idx : -1); - } - - @Override - public void print() { - System.out.print("Dummycoding List: \n "); - for(int i : _dcdList) { - System.out.print(i + " "); - } - System.out.println(); - } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java index 9763403..5cadef6 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java @@ -26,7 +26,6 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.CharacterCodingException; import java.util.ArrayList; -import java.util.Arrays; import java.util.BitSet; import java.util.HashMap; import java.util.Iterator; @@ -50,10 +49,11 @@ import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; +import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; -public class MVImputeAgent extends TransformationAgent { - +public class MVImputeAgent extends Encoder +{ private static final long serialVersionUID = 9057868620144662194L; public static final String MEAN_PREFIX = "mean"; @@ -65,7 +65,6 @@ public class MVImputeAgent extends TransformationAgent { public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT }; - private int[] _mvList = null; /* * Imputation Methods: * 1 - global_mean @@ -94,6 +93,8 @@ public class MVImputeAgent extends TransformationAgent { private CM_COV_Object[] _scnomvVarList = null; // column-level variances, computed so far private String[] _replacementList = null; // replacements: for global_mean, mean; and for global_mode, recode id of mode category + private String[] _NAstrings = null; + public String[] getReplacements() { return _replacementList; } public KahanObject[] getMeans() { return _meanList; } @@ -101,46 +102,47 @@ public class MVImputeAgent extends TransformationAgent { public KahanObject[] getMeans_scnomv() { return _scnomvMeanList; } public CM_COV_Object[] getVars_scnomv() { return _scnomvVarList; } - MVImputeAgent(JSONObject parsedSpec) throws JSONException { - - boolean isMV = parsedSpec.containsKey(TX_METHOD.IMPUTE.toString()); - boolean isSC = parsedSpec.containsKey(TX_METHOD.SCALE.toString()); + public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings) + throws JSONException + { + super(null); + boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE); + boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE); + _NAstrings = NAstrings; if(!isMV) { // MV Impute is not applicable - _mvList = null; + _colList = null; _mvMethodList = null; _meanList = null; _countList = null; _replacementList = null; } else { - JSONObject mvobj = (JSONObject) parsedSpec.get(TX_METHOD.IMPUTE.toString()); - JSONArray mvattrs = (JSONArray) mvobj.get(JSON_ATTRS); - JSONArray mvmthds = (JSONArray) mvobj.get(JSON_MTHD); + JSONObject mvobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_IMPUTE); + JSONArray mvattrs = (JSONArray) mvobj.get(TfUtils.JSON_ATTRS); + JSONArray mvmthds = (JSONArray) mvobj.get(TfUtils.JSON_MTHD); int mvLength = mvattrs.size(); - assert(mvLength == mvmthds.size()); - - _mvList = new int[mvLength]; + _colList = new int[mvLength]; _mvMethodList = new byte[mvLength]; _meanList = new KahanObject[mvLength]; _countList = new long[mvLength]; _varList = new CM_COV_Object[mvLength]; - _isMVScaled = new BitSet(_mvList.length); + _isMVScaled = new BitSet(_colList.length); _isMVScaled.clear(); - for(int i=0; i < _mvList.length; i++) { - _mvList[i] = UtilFunctions.toInt(mvattrs.get(i)); + for(int i=0; i < _colList.length; i++) { + _colList[i] = UtilFunctions.toInt(mvattrs.get(i)); _mvMethodList[i] = (byte) UtilFunctions.toInt(mvmthds.get(i)); _meanList[i] = new KahanObject(0, 0); } _replacementList = new String[mvLength]; // contains replacements for all columns (scale and categorical) - JSONArray constants = (JSONArray)mvobj.get(JSON_CONSTS); + JSONArray constants = (JSONArray)mvobj.get(TfUtils.JSON_CONSTS); for(int i=0; i < constants.size(); i++) { if ( constants.get(i) == null ) _replacementList[i] = "NaN"; @@ -159,12 +161,12 @@ public class MVImputeAgent extends TransformationAgent { } else { - if ( _mvList != null ) - _mvscMethodList = new byte[_mvList.length]; + if ( _colList != null ) + _mvscMethodList = new byte[_colList.length]; - JSONObject scobj = (JSONObject) parsedSpec.get(TX_METHOD.SCALE.toString()); - JSONArray scattrs = (JSONArray) scobj.get(JSON_ATTRS); - JSONArray scmthds = (JSONArray) scobj.get(JSON_MTHD); + JSONObject scobj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_SCALE); + JSONArray scattrs = (JSONArray) scobj.get(TfUtils.JSON_ATTRS); + JSONArray scmthds = (JSONArray) scobj.get(TfUtils.JSON_MTHD); int scLength = scattrs.size(); int[] _allscaled = new int[scLength]; @@ -178,7 +180,7 @@ public class MVImputeAgent extends TransformationAgent { _allscaled[i] = colID; // check if the attribute is also MV imputed - int mvidx = isImputed(colID); + int mvidx = isApplicable(colID); if(mvidx != -1) { _isMVScaled.set(mvidx); @@ -203,7 +205,7 @@ public class MVImputeAgent extends TransformationAgent { colID = UtilFunctions.toInt(scattrs.get(i)); mthd = (byte)UtilFunctions.toInt(scmthds.get(i)); - if(isImputed(colID) == -1) + if(isApplicable(colID) == -1) { // scaled but not imputed _scnomvList[idx] = colID; _scnomvMethodList[idx] = mthd; @@ -216,17 +218,17 @@ public class MVImputeAgent extends TransformationAgent { } } - public void prepare(String[] words, TfUtils agents) throws IOException { + public void prepare(String[] words) throws IOException { try { String w = null; - if(_mvList != null) - for(int i=0; i <_mvList.length; i++) { - int colID = _mvList[i]; + if(_colList != null) + for(int i=0; i <_colList.length; i++) { + int colID = _colList[i]; w = UtilFunctions.unquote(words[colID-1].trim()); try { - if(!agents.isNA(w)) { + if(!TfUtils.isNA(_NAstrings, w)) { _countList[i]++; boolean computeMean = (_mvMethodList[i] == 1 || _isMVScaled.get(i) ); @@ -421,9 +423,9 @@ public class MVImputeAgent extends TransformationAgent { StringBuilder sb = new StringBuilder(); DistinctValue dv = null; - if(_mvList != null) - for(int i=0; i < _mvList.length; i++) { - int colID = _mvList[i]; + if(_colList != null) + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; IntWritable iw = new IntWritable(-colID); dv = prepMeanOutput(taskID, i, sb, false); outDV(iw, dv, out); @@ -476,9 +478,9 @@ public class MVImputeAgent extends TransformationAgent { StringBuilder sb = new StringBuilder(); DistinctValue dv = null; - if(_mvList != null) - for(int i=0; i < _mvList.length; i++) { - int colID = _mvList[i]; + if(_colList != null) + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; Integer iw = -colID; dv = prepMeanOutput(taskID, i, sb, false); addDV(iw, dv, list); @@ -516,7 +518,7 @@ public class MVImputeAgent extends TransformationAgent { private void writeTfMtd(int colID, String mean, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + MV_FILE_SUFFIX); + Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + TfUtils.MV_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + mean + "\n"); br.close(); @@ -524,7 +526,7 @@ public class MVImputeAgent extends TransformationAgent { private void writeTfMtd(int colID, String mean, String sdev, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt=new Path(tfMtdDir+"/Scale/"+ agents.getName(colID) + SCALE_FILE_SUFFIX); + Path pt=new Path(tfMtdDir+"/Scale/"+ agents.getName(colID) + TfUtils.SCALE_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + mean + TfUtils.TXMTD_SEP + sdev + "\n"); br.close(); @@ -532,7 +534,7 @@ public class MVImputeAgent extends TransformationAgent { private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX); + Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); br.close(); @@ -541,9 +543,9 @@ public class MVImputeAgent extends TransformationAgent { public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { try{ - if (_mvList != null) - for(int i=0; i < _mvList.length; i++) { - int colID = _mvList[i]; + if (_colList != null) + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; double imputedValue = Double.NaN; KahanObject gmean = null; @@ -770,11 +772,11 @@ public class MVImputeAgent extends TransformationAgent { { try { if( totalValidCount != totalRecordCount) { - // In the presense of missing values, the variance needs to be adjusted. + // In the presence of missing values, the variance needs to be adjusted. // The mean does not need to be adjusted, when mv impute method is global_mean, // since missing values themselves are replaced with gmean. long totalMissingCount = (totalRecordCount-totalValidCount); - int idx = isImputed(colID); + int idx = isApplicable(colID); if(idx != -1 && _mvMethodList[idx] == 3) _meanFn.execute(gmean, UtilFunctions.parseToDouble(_replacementList[idx]), totalRecordCount); _varFn.execute(gcm, gmean._sum, totalMissingCount); @@ -797,7 +799,7 @@ public class MVImputeAgent extends TransformationAgent { private String readReplacement(int colID, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - Path path = new Path( txMtdDir + "/Impute/" + agents.getName(colID) + MV_FILE_SUFFIX); + Path path = new Path( txMtdDir + "/Impute/" + agents.getName(colID) + TfUtils.MV_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); @@ -810,7 +812,7 @@ public class MVImputeAgent extends TransformationAgent { public String readScaleLine(int colID, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - Path path = new Path( txMtdDir + "/Scale/" + agents.getName(colID) + SCALE_FILE_SUFFIX); + Path path = new Path( txMtdDir + "/Scale/" + agents.getName(colID) + TfUtils.SCALE_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); String line = br.readLine(); @@ -846,9 +848,9 @@ public class MVImputeAgent extends TransformationAgent { if(fs.isDirectory(tfMtdDir)) { // Load information about missing value imputation - if (_mvList != null) - for(int i=0; i<_mvList.length;i++) { - int colID = _mvList[i]; + if (_colList != null) + for(int i=0; i<_colList.length;i++) { + int colID = _colList[i]; if ( _mvMethodList[i] == 1 || _mvMethodList[i] == 2 ) // global_mean or global_mode @@ -862,10 +864,10 @@ public class MVImputeAgent extends TransformationAgent { } // Load scaling information - if(_mvList != null) - for(int i=0; i < _mvList.length; i++) + if(_colList != null) + for(int i=0; i < _colList.length; i++) if ( _isMVScaled.get(i) ) - processScalingFile(i, _mvList, _meanList, _varList, fs, tfMtdDir, agents); + processScalingFile(i, _colList, _meanList, _varList, fs, tfMtdDir, agents); if(_scnomvList != null) for(int i=0; i < _scnomvList.length; i++) @@ -884,21 +886,21 @@ public class MVImputeAgent extends TransformationAgent { * @return */ @Override - public String[] apply(String[] words, TfUtils agents) { - - if ( _mvList != null) - for(int i=0; i < _mvList.length; i++) { - int colID = _mvList[i]; - String w = UtilFunctions.unquote(words[colID-1]); - if(agents.isNA(w)) - w = words[colID-1] = _replacementList[i]; - - if ( _isMVScaled.get(i) ) - if ( _mvscMethodList[i] == 1 ) - words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum ); - else - words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum ); - } + public String[] apply(String[] words) + { + if( isApplicable() ) + for(int i=0; i < _colList.length; i++) { + int colID = _colList[i]; + String w = UtilFunctions.unquote(words[colID-1]); + if(TfUtils.isNA(_NAstrings, w)) + w = words[colID-1] = _replacementList[i]; + + if ( _isMVScaled.get(i) ) + if ( _mvscMethodList[i] == 1 ) + words[colID-1] = Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum ); + else + words[colID-1] = Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum ); + } if(_scnomvList != null) for(int i=0; i < _scnomvList.length; i++) @@ -913,23 +915,8 @@ public class MVImputeAgent extends TransformationAgent { return words; } - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isImputed(int colID) - { - if(_mvList == null) - return -1; - - int idx = Arrays.binarySearch(_mvList, colID); - return ( idx >= 0 ? idx : -1); - } - - public MVMethod getMethod(int colID) - { - int idx = isImputed(colID); - + public MVMethod getMethod(int colID) { + int idx = isApplicable(colID); if(idx == -1) return MVMethod.INVALID; @@ -943,35 +930,13 @@ public class MVImputeAgent extends TransformationAgent { } - public long getNonMVCount(int colID) - { - int idx = isImputed(colID); - if(idx == -1) - return 0; - else - return _countList[idx]; + public long getNonMVCount(int colID) { + int idx = isApplicable(colID); + return (idx == -1) ? 0 : _countList[idx]; } - public String getReplacement(int colID) - { - int idx = isImputed(colID); - - if(idx == -1) - return null; - else - return _replacementList[idx]; - } - - public void print() { - System.out.print("MV Imputation List: \n "); - for(int i : _mvList) { - System.out.print(i + " "); - } - System.out.print("\n "); - for(byte b : _mvMethodList) { - System.out.print(b + " "); - } - System.out.println(); + public String getReplacement(int colID) { + int idx = isApplicable(colID); + return (idx == -1) ? null : _replacementList[idx]; } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java index bd2feb3..c169129 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java @@ -20,7 +20,6 @@ package org.apache.sysml.runtime.transform; import java.io.IOException; -import java.util.Arrays; import java.util.Iterator; import org.apache.hadoop.fs.FileSystem; @@ -31,71 +30,43 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; - +import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; -public class OmitAgent extends TransformationAgent { - +public class OmitAgent extends Encoder +{ private static final long serialVersionUID = 1978852120416654195L; - private int[] _omitList = null; - - OmitAgent() { } + public OmitAgent() { + super(null); + } - OmitAgent(int[] list) { - _omitList = list; + public OmitAgent(int[] list) { + super(list); } - public OmitAgent(JSONObject parsedSpec) throws JSONException { - if (!parsedSpec.containsKey(TX_METHOD.OMIT.toString())) + public OmitAgent(JSONObject parsedSpec) + throws JSONException + { + super(null); + if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT)) return; - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.OMIT.toString()); - JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); - - _omitList = new int[attrs.size()]; - for(int i=0; i < _omitList.length; i++) - _omitList[i] = UtilFunctions.toInt(attrs.get(i)); + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_OMIT); + initColList((JSONArray) obj.get(TfUtils.JSON_ATTRS)); } public boolean omit(String[] words, TfUtils agents) { - if(_omitList == null) + if( !isApplicable() ) return false; - for(int i=0; i<_omitList.length; i++) - { - int colID = _omitList[i]; - if(agents.isNA(UtilFunctions.unquote(words[colID-1].trim()))) + for(int i=0; i<_colList.length; i++) { + int colID = _colList[i]; + if(TfUtils.isNA(agents.getNAStrings(),UtilFunctions.unquote(words[colID-1].trim()))) return true; } return false; } - - public boolean isApplicable() - { - return (_omitList != null); - } - - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isOmitted(int colID) - { - if(_omitList == null) - return -1; - - int idx = Arrays.binarySearch(_omitList, colID); - return ( idx >= 0 ? idx : -1); - } - - @Override - public void print() { - System.out.print("Omit List: \n "); - for(int i : _omitList) - System.out.print(i + " "); - System.out.println(); - } @Override public void mapOutputTransformationMetadata( @@ -115,10 +86,8 @@ public class OmitAgent extends TransformationAgent { } @Override - public String[] apply(String[] words, TfUtils agents) { + public String[] apply(String[] words) { return null; } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java index aa7c22e..1f9749d 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -48,44 +47,42 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; import org.apache.sysml.runtime.transform.decode.DecoderRecode; +import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.util.UtilFunctions; -public class RecodeAgent extends TransformationAgent { - +public class RecodeAgent extends Encoder +{ private static final long serialVersionUID = 8213163881283341874L; - private int[] _rcdList = null; private int[] _mvrcdList = null; private int[] _fullrcdList = null; // HashMap< columnID, HashMap<distinctValue, count> > private HashMap<Integer, HashMap<String, Long>> _rcdMaps = new HashMap<Integer, HashMap<String, Long>>(); + private HashMap<Integer, HashMap<String,String>> _finalMaps = null; - RecodeAgent(JSONObject parsedSpec) throws JSONException { - - int rcdCount = 0; - - if ( parsedSpec.containsKey(TX_METHOD.RECODE.toString())) + public RecodeAgent(JSONObject parsedSpec) + throws JSONException + { + super(null); + int rcdCount = 0; + if ( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { //TODO consolidate external and internal json spec definitions JSONArray attrs = null; - if( parsedSpec.get(TX_METHOD.RECODE.toString()) instanceof JSONObject ) { - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.RECODE.toString()); - attrs = (JSONArray) obj.get(JSON_ATTRS); + if( parsedSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_RECODE); + attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); } else - attrs = (JSONArray)parsedSpec.get(TX_METHOD.RECODE.toString()); - - _rcdList = new int[attrs.size()]; - for(int i=0; i < _rcdList.length; i++) - _rcdList[i] = UtilFunctions.toInt(attrs.get(i)); - rcdCount = _rcdList.length; + attrs = (JSONArray)parsedSpec.get(TfUtils.TXMETHOD_RECODE); + rcdCount = initColList(attrs); } - if ( parsedSpec.containsKey(TX_METHOD.MVRCD.toString())) + if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) { - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.MVRCD.toString()); - JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_MVRCD); + JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); _mvrcdList = new int[attrs.size()]; for(int i=0; i < _mvrcdList.length; i++) @@ -97,9 +94,9 @@ public class RecodeAgent extends TransformationAgent { { _fullrcdList = new int[rcdCount]; int idx = -1; - if(_rcdList != null) - for(int i=0; i < _rcdList.length; i++) - _fullrcdList[++idx] = _rcdList[i]; + if(_colList != null) + for(int i=0; i < _colList.length; i++) + _fullrcdList[++idx] = _colList[i]; if(_mvrcdList != null) for(int i=0; i < _mvrcdList.length; i++) @@ -114,8 +111,8 @@ public class RecodeAgent extends TransformationAgent { * @param frame */ public void initRecodeMaps( FrameBlock frame ) { - for( int j=0; j<_rcdList.length; j++ ) { - int colID = _rcdList[j]; //1-based + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based HashMap<String,Long> map = new HashMap<String,Long>(); for( int i=0; i<frame.getNumRows(); i++ ) { String[] tmp = frame.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX); @@ -125,8 +122,16 @@ public class RecodeAgent extends TransformationAgent { } } + public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { + return _rcdMaps; + } + + public HashMap<Integer, HashMap<String,String>> getRecodeMaps() { + return _finalMaps; + } + void prepare(String[] words, TfUtils agents) { - if ( _rcdList == null && _mvrcdList == null ) + if ( _colList == null && _mvrcdList == null ) return; String w = null; @@ -183,7 +188,7 @@ public class RecodeAgent extends TransformationAgent { } public void mapOutputHelper(int taskID, OutputCollector<IntWritable, DistinctValue> out, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { - if ( _rcdList == null && _mvrcdList == null ) + if ( _colList == null && _mvrcdList == null ) return; try @@ -241,7 +246,7 @@ public class RecodeAgent extends TransformationAgent { int rcdIndex = 0, modeIndex = 0; long maxCount = Long.MIN_VALUE; - boolean isRecoded = (isRecoded(colID) != -1); + boolean isRecoded = (isApplicable(colID) != -1); boolean isModeImputed = (mvagent.getMethod(colID) == MVMethod.GLOBAL_MODE); Path pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + TfUtils.TXMTD_RCD_MAP_SUFFIX); @@ -293,7 +298,7 @@ public class RecodeAgent extends TransformationAgent { if ( isRecoded ) { // output mode - pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + MODE_FILE_SUFFIX); + pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + TfUtils.MODE_FILE_SUFFIX); br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(UtilFunctions.quote(mode) + "," + modeIndex + "," + maxCount ); br.close(); @@ -307,7 +312,7 @@ public class RecodeAgent extends TransformationAgent { if (isModeImputed) { - pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) + MV_FILE_SUFFIX); + pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) + TfUtils.MV_FILE_SUFFIX); br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + "," + UtilFunctions.quote(mode)); br.close(); @@ -316,7 +321,7 @@ public class RecodeAgent extends TransformationAgent { } public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { - if(_rcdList == null && _mvrcdList == null ) + if(_colList == null && _mvrcdList == null ) return; for(int i=0; i<_fullrcdList.length; i++) { @@ -356,15 +361,6 @@ public class RecodeAgent extends TransformationAgent { writeMetadata(map, outputDir, colID, fs, agents, false); } - // ------------------------------------------------------------------------------------------------ - - public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { return _rcdMaps; } - - HashMap<Integer, HashMap<String,String>> _finalMaps = null; - public HashMap<Integer, HashMap<String,String>> getRecodeMaps() { - return _finalMaps; - } - /** * Method to load recode maps of all attributes, at once. * @@ -373,14 +369,14 @@ public class RecodeAgent extends TransformationAgent { */ @Override public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if ( _rcdList == null ) + if( !isApplicable() ) return; _finalMaps = new HashMap<Integer, HashMap<String, String>>(); if(fs.isDirectory(txMtdDir)) { - for(int i=0; i<_rcdList.length;i++) { - int colID = _rcdList[i]; + for(int i=0; i<_colList.length;i++) { + int colID = _colList[i]; Path path = new Path( txMtdDir + "/Recode/" + agents.getName(colID) + TfUtils.TXMTD_RCD_MAP_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); @@ -404,22 +400,7 @@ public class RecodeAgent extends TransformationAgent { fs.close(); throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir); } - } - - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isRecoded(int colID) - { - if(_rcdList == null) - return -1; - - int idx = Arrays.binarySearch(_rcdList, colID); - return ( idx >= 0 ? idx : -1); - } - - + } /** * Method to apply transformations. @@ -428,16 +409,17 @@ public class RecodeAgent extends TransformationAgent { * @return */ @Override - public String[] apply(String[] words, TfUtils agents) { - if ( _rcdList == null ) + public String[] apply(String[] words) + { + if( !isApplicable() ) return words; //apply recode maps on relevant columns of given row - for(int i=0; i < _rcdList.length; i++) { + for(int i=0; i < _colList.length; i++) { //prepare input and get code - int colID = _rcdList[i]; + int colID = _colList[i]; String key = UtilFunctions.unquote(words[colID-1].trim()); - String val = _finalMaps.get(colID).get(key); + String val = lookupRCDMap(colID, key); // replace unseen keys with NaN words[colID-1] = (val!=null) ? val : "NaN"; } @@ -447,43 +429,17 @@ public class RecodeAgent extends TransformationAgent { /** * - * @param words - * @param agents + * @param colID + * @param key * @return */ - public String[] cp_apply(String[] words, TfUtils agents) { - if ( _rcdList == null ) - return words; - - //apply recode maps on relevant columns of given row - for(int i=0; i < _rcdList.length; i++) { - //prepare input and get code - int colID = _rcdList[i]; - String key = UtilFunctions.unquote(words[colID-1].trim()); - Long val = _rcdMaps.get(colID).get(key); - // replace unseen keys with NaN - words[colID-1] = (val!=null) ? Long.toString(val) : "NaN"; - } - - return words; - } - - public void printMaps() { - for(Integer k : _rcdMaps.keySet()) { - System.out.println("Column " + k); - HashMap<String,Long> map = _rcdMaps.get(k); - for(String w : map.keySet()) { - System.out.println(" " + w + " : " + map.get(w)); - } - } - } - - public void print() { - System.out.print("Recoding List: \n "); - for(int i : _rcdList) { - System.out.print(i + " "); + private String lookupRCDMap(int colID, String key) { + if( _finalMaps!=null ) + return _finalMaps.get(colID).get(key); + else { //used for cp + Long tmp = _rcdMaps.get(colID).get(key); + return (tmp!=null) ? Long.toString(tmp) : null; } - System.out.println(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java index bc08a3a..16ccf6e 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java @@ -53,6 +53,36 @@ public class TfUtils implements Serializable{ private static final long serialVersionUID = 526252850872633125L; + protected enum ColumnTypes { + SCALE, + NOMINAL, + ORDINAL, + DUMMYCODED, + INVALID; + + protected byte toID() { + switch(this) { + case SCALE: return 1; + case NOMINAL: return 2; + case ORDINAL: return 3; + // Ideally, dummycoded columns should be of a different type. Treating them as SCALE is incorrect, semantically. + case DUMMYCODED: return 1; + default: + throw new RuntimeException("Invalid Column Type: " + this); + } + } + } + + //transform methods + public static final String TXMETHOD_IMPUTE = "impute"; + public static final String TXMETHOD_RECODE = "recode"; + public static final String TXMETHOD_BIN = "bin"; + public static final String TXMETHOD_DUMMYCODE = "dummycode"; + public static final String TXMETHOD_SCALE = "scale"; + public static final String TXMETHOD_OMIT = "omit"; + public static final String TXMETHOD_MVRCD = "mvrcd"; + + //transform meta data constants public static final String TXMTD_SEP = ","; public static final String TXMTD_COLTYPES = "coltypes.csv"; public static final String TXMTD_COLNAMES = "column.names"; @@ -60,6 +90,17 @@ public class TfUtils implements Serializable{ public static final String TXMTD_RCD_MAP_SUFFIX = ".map"; public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct"; + public static final String JSON_ATTRS = "attributes"; + public static final String JSON_MTHD = "methods"; + public static final String JSON_CONSTS = "constants"; + public static final String JSON_NBINS = "numbins"; + protected static final String MV_FILE_SUFFIX = ".impute"; + protected static final String MODE_FILE_SUFFIX = ".mode"; + protected static final String BIN_FILE_SUFFIX = ".bin"; + protected static final String SCALE_FILE_SUFFIX = ".scale"; + protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv"; + protected static final String DCD_NAME_SEP = "_"; + private OmitAgent _oa = null; private MVImputeAgent _mia = null; @@ -136,7 +177,7 @@ public class TfUtils implements Serializable{ { //TODO recodemaps handover _numInputCols = inNcol; - createAgents(spec); + createAgents(spec, new String[]{}); } protected static boolean checkValidInputFile(FileSystem fs, Path path, boolean err) @@ -200,9 +241,11 @@ public class TfUtils implements Serializable{ return parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS)); } - private void createAgents(JSONObject spec) throws IOException, JSONException { + private void createAgents(JSONObject spec, String[] naStrings) + throws IOException, JSONException + { _oa = new OmitAgent(spec); - _mia = new MVImputeAgent(spec); + _mia = new MVImputeAgent(spec, naStrings); _ra = new RecodeAgent(spec); _ba = new BinAgent(spec); _da = new DummycodeAgent(spec, _numInputCols); @@ -242,7 +285,7 @@ public class TfUtils implements Serializable{ _outputPath = outputPath; parseColumnNames(); - createAgents(spec); + createAgents(spec, naStrings); } public void incrValid() { _numValidRecords++; } @@ -282,25 +325,23 @@ public class TfUtils implements Serializable{ * @param w * @return */ - public boolean isNA(String w) { - if(_NAstrings == null) + public static boolean isNA(String[] NAstrings, String w) { + if(NAstrings == null) return false; - for(String na : _NAstrings) { + for(String na : NAstrings) { if(w.equals(na)) return true; } return false; } - public String[] getWords(Text line) - { + public String[] getWords(Text line) { return getWords(line.toString()); } - public String[] getWords(String line) - { + public String[] getWords(String line) { return getDelim().split(line.trim(), -1); } @@ -315,10 +356,10 @@ public class TfUtils implements Serializable{ String[] words = getWords(line); if(!getOmitAgent().omit(words, this)) { - getMVImputeAgent().prepare(words, this); + getMVImputeAgent().prepare(words); getRecodeAgent().prepare(words, this); getBinAgent().prepare(words, this); - incrValid();; + incrValid(); } incrTotal(); @@ -354,30 +395,10 @@ public class TfUtils implements Serializable{ // associate recode maps and bin definitions with dummycoding agent, // as recoded and binned columns are typically dummycoded getDummycodeAgent().setRecodeMaps( getRecodeAgent().getRecodeMaps() ); - getDummycodeAgent().setNumBins(getBinAgent().getBinList(), getBinAgent().getNumBins()); + getDummycodeAgent().setNumBins(getBinAgent().getColList(), getBinAgent().getNumBins()); getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this); } - - /*public void loadTfMetadata () throws IOException - { - Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0]; - FileSystem localFS = FileSystem.getLocal(_rJob); - - loadTfMetadata(_rJob, localFS, tfMtdDir); - - FileSystem fs; - fs = FileSystem.get(_rJob); - Path thisPath=new Path(_rJob.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); - String thisfile=thisPath.toString(); - - Path smallestFilePath=new Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs); - if(thisfile.toString().equals(smallestFilePath.toString())) - _partFileWithHeader=true; - else - _partFileWithHeader = false; - }*/ - public String processHeaderLine() throws IOException { @@ -402,11 +423,6 @@ public class TfUtils implements Serializable{ return getOmitAgent().omit(words, this); } - - public String[] apply(String[] words) { - return apply(words, false); - } - /** * Function to apply transformation metadata on a given row. * @@ -414,18 +430,11 @@ public class TfUtils implements Serializable{ * @param optimizeMaps * @return */ - public String[] apply ( String[] words, boolean optimizeMaps ) - { - words = getMVImputeAgent().apply(words, this); - - if(optimizeMaps) - // specific case of transform() invoked from CP (to save boxing and unboxing) - words = getRecodeAgent().cp_apply(words, this); - else - words = getRecodeAgent().apply(words, this); - - words = getBinAgent().apply(words, this); - words = getDummycodeAgent().apply(words, this); + public String[] apply( String[] words ) { + words = getMVImputeAgent().apply(words); + words = getRecodeAgent().apply(words); + words = getBinAgent().apply(words); + words = getDummycodeAgent().apply(words); _numTransformedRows++; return words; @@ -443,8 +452,7 @@ public class TfUtils implements Serializable{ } } - public String checkAndPrepOutputString(String []words) throws DMLRuntimeException - { + public String checkAndPrepOutputString(String []words) throws DMLRuntimeException { return checkAndPrepOutputString(words, new StringBuilder()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java deleted file mode 100644 index 0cdf682..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; - -public abstract class TransformationAgent implements Serializable { - - private static final long serialVersionUID = -2995384194257356337L; - - public static enum TX_METHOD { - IMPUTE ("impute"), - RECODE ("recode"), - BIN ("bin"), - DUMMYCODE ("dummycode"), - SCALE ("scale"), - OMIT ("omit"), - MVRCD ("mvrcd"); - - private String _name; - - TX_METHOD(String name) { _name = name; } - - public String toString() { - return _name; - } - } - - public static final String JSON_ATTRS = "attributes"; - public static final String JSON_MTHD = "methods"; - public static final String JSON_CONSTS = "constants"; - public static final String JSON_NBINS = "numbins"; - - protected static final String MV_FILE_SUFFIX = ".impute"; - protected static final String MODE_FILE_SUFFIX = ".mode"; - protected static final String BIN_FILE_SUFFIX = ".bin"; - protected static final String SCALE_FILE_SUFFIX = ".scale"; - protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv"; - - protected static final String DCD_NAME_SEP = "_"; - - - abstract public void print(); - abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException; - abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException; - - abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException; - abstract public String[] apply(String[] words, TfUtils agents); - - protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, INVALID } - protected byte columnTypeToID(ColumnTypes type) throws IOException { - switch(type) - { - case SCALE: return 1; - case NOMINAL: return 2; - case ORDINAL: return 3; - case DUMMYCODED: return 1; // Ideally, dummycoded columns should be of a different type. Treating them as SCALE is incorrect, semantically. - default: - throw new IOException("Invalid Column Type: " + type); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java index 1999f10..f66ed5e 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java @@ -44,8 +44,9 @@ public abstract class Decoder * * @param in * @param out + * @return */ - public abstract void decode(double[] in, Object[] out); + public abstract Object[] decode(double[] in, Object[] out); /** * Block decode API converting a matrix block into a frame block. http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java index 4ad11ed..b0b0909 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java @@ -47,9 +47,10 @@ public class DecoderComposite extends Decoder } @Override - public void decode(double[] in, Object[] out) { + public Object[] decode(double[] in, Object[] out) { for( Decoder decoder : _decoders ) decoder.decode(in, out); + return out; } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java index e6560c6..cdf9a52 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java @@ -26,8 +26,7 @@ import java.util.List; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; -import org.apache.sysml.runtime.transform.TransformationAgent; -import org.apache.sysml.runtime.transform.TransformationAgent.TX_METHOD; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONObject; @@ -59,14 +58,14 @@ public class DecoderFactory } //create decoder 'recode' - if ( jSpec.containsKey(TX_METHOD.RECODE.toString())) { + if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { JSONArray attrs = null; - if( jSpec.get(TX_METHOD.RECODE.toString()) instanceof JSONObject ) { - JSONObject obj = (JSONObject) jSpec.get(TX_METHOD.RECODE.toString()); - attrs = (JSONArray) obj.get(TransformationAgent.JSON_ATTRS); + if( jSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { + JSONObject obj = (JSONObject) jSpec.get(TfUtils.TXMETHOD_RECODE); + attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); } else - attrs = (JSONArray)jSpec.get(TX_METHOD.RECODE.toString()); + attrs = (JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE); int[] rcCols = new int[attrs.size()]; for(int j=0; j<rcCols.length; j++) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java index 0c9a872..f554f8b 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java @@ -61,11 +61,12 @@ public class DecoderRecode extends Decoder } @Override - public void decode(double[] in, Object[] out) { + public Object[] decode(double[] in, Object[] out) { for( int j=0; j<_rcCols.length; j++ ) { long key = UtilFunctions.toLong(in[_rcCols[j]]); out[_rcCols[j]] = _rcMaps[j].get(key); } + return out; } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java new file mode 100644 index 0000000..32694c9 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.encode; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.sysml.runtime.transform.DistinctValue; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.wink.json4j.JSONArray; + +/** + * Base class for all transform encoders providing both a row and block + * interface for decoding frames to matrices. + * + */ +public abstract class Encoder implements Serializable +{ + private static final long serialVersionUID = 2299156350718979064L; + + protected int[] _colList = null; + + protected Encoder( int[] colList ) { + _colList = colList; + } + + public int[] getColList() { + return _colList; + } + + /** + * + * @param attrs + */ + public int initColList(JSONArray attrs) { + _colList = new int[attrs.size()]; + for(int i=0; i < _colList.length; i++) + _colList[i] = UtilFunctions.toInt(attrs.get(i)); + return _colList.length; + } + + /** + * Indicates if this encoder is applicable, i.e, if there is at + * least one column to encode. + * + * @return + */ + public boolean isApplicable() { + return (_colList != null && _colList.length > 0); + } + + /** + * Indicates if this encoder is applicable for the given column ID, + * i.e., if it is subject to this transformation. + * + */ + public int isApplicable(int colID) { + if(_colList == null) + return -1; + int idx = Arrays.binarySearch(_colList, colID); + return ( idx >= 0 ? idx : -1); + } + + /** + * Encode input data according to existing transform meta + * data (transform apply). + * + * @param in + * @return + */ + public abstract String[] apply(String[] in); + + + //OLD API: kept for a transition phase only + //TODO stage 2: refactor data and meta data IO into minimal set of ultility functions + abstract public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException; + abstract public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException; + abstract public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException; +}
