Repository: incubator-systemml Updated Branches: refs/heads/master 55c8ee7d6 -> aa83a6f44
[SYSTEMML-569] Extended frame transform apply (dummy, bin, omit, impute) This patch extends the supported frame transform apply functionality in CP/Spark from recoding to recoding, dummy coding, value omitting, and missing value imputation. Furthermore this also includes new (isolated) test cases and an extension of the meta data frame read utility. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8f7e8cca Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8f7e8cca Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8f7e8cca Branch: refs/heads/master Commit: 8f7e8cca76c8f4be6af69cc4001aab5ec977e3e6 Parents: 55c8ee7 Author: Matthias Boehm <[email protected]> Authored: Fri Jun 17 21:19:09 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jun 17 22:39:24 2016 -0700 ---------------------------------------------------------------------- .../ParameterizedBuiltinSPInstruction.java | 84 +++++- .../instructions/spark/utils/SparkUtils.java | 12 + .../sysml/runtime/matrix/data/FrameBlock.java | 82 +++++- .../sysml/runtime/transform/BinAgent.java | 110 ++++++-- .../sysml/runtime/transform/DataTransform.java | 4 +- .../sysml/runtime/transform/DummycodeAgent.java | 69 +++-- .../sysml/runtime/transform/MVImputeAgent.java | 64 ++++- .../sysml/runtime/transform/OmitAgent.java | 66 ++++- .../sysml/runtime/transform/RecodeAgent.java | 40 +-- .../apache/sysml/runtime/transform/TfUtils.java | 26 +- .../sysml/runtime/transform/encode/Encoder.java | 23 +- .../transform/encode/EncoderComposite.java | 20 +- .../transform/encode/EncoderFactory.java | 81 ++++-- .../transform/encode/EncoderPassThrough.java | 15 +- .../runtime/transform/meta/TfMetaUtils.java | 278 +++++++++++++++---- .../runtime/transform/meta/TfOffsetMap.java | 72 +++++ .../sysml/runtime/util/UtilFunctions.java | 17 ++ .../functions/jmlc/FrameReadMetaTest.java | 50 ++-- .../functions/transform/TransformFrameTest.java | 60 +++- .../org/apache/sysml/test/utils/TestUtils.java | 4 +- .../input/homes3/homes.tfspec_bin.json | 5 + .../input/homes3/homes.tfspec_dummy.json | 2 + .../input/homes3/homes.tfspec_impute.json | 10 + .../input/homes3/homes.tfspec_omit.json | 2 + .../functions/transform/ZPackageSuite.java | 1 + 25 files changed, 948 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index ae77c35..0d45ae1 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -25,6 +25,7 @@ import java.util.HashMap; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; @@ -70,11 +71,15 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTy import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.SimpleOperator; import org.apache.sysml.runtime.transform.DataTransform; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.transform.encode.EncoderFactory; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.transform.meta.TfOffsetMap; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.UtilFunctions; + public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction { protected HashMap<String,String> params; @@ -419,18 +424,26 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction FrameBlock meta = sec.getFrameInput(params.get("meta")); MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(params.get("target")); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - mcOut.setDimension(mcIn.getRows(), mcIn.getCols()); //TODO encoder awareness + //compute omit offset map for block shifts + TfOffsetMap omap = null; + if( TfMetaUtils.containsOmitSpec(params.get("spec")) ) { + omap = new TfOffsetMap(SparkUtils.toIndexedLong(in.mapToPair( + new RDDTransformApplyOffsetFunction(params.get("spec"))).collect())); + } + //create encoder broadcast (avoiding replication per task) Encoder encoder = EncoderFactory.createEncoder(params.get("spec"), fo.getSchema(), (int)fo.getNumColumns(), meta); + mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), encoder.getNumCols()); Broadcast<Encoder> bmeta = sec.getSparkContext().broadcast(encoder); - + Broadcast<TfOffsetMap> bomap = (omap!=null) ? sec.getSparkContext().broadcast(omap) : null; + //execute transform apply JavaPairRDD<Long,FrameBlock> tmp = in - .mapValues(new RDDTransformApplyFunction(bmeta)); + .mapToPair(new RDDTransformApplyFunction(bmeta, bomap)); JavaPairRDD<MatrixIndexes,MatrixBlock> out = FrameRDDConverterUtils - .binaryBlockToMatrixBlock(tmp, mcIn, mcOut); + .binaryBlockToMatrixBlock(tmp, mcOut, mcOut); //set output and maintain lineage/output characteristics sec.setRDDHandleForVariable(output.getName(), out); @@ -685,26 +698,79 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction /** * */ - public static class RDDTransformApplyFunction implements Function<FrameBlock,FrameBlock> + public static class RDDTransformApplyFunction implements PairFunction<Tuple2<Long,FrameBlock>,Long,FrameBlock> { private static final long serialVersionUID = 5759813006068230916L; private Broadcast<Encoder> _bencoder = null; + private Broadcast<TfOffsetMap> _omap = null; - public RDDTransformApplyFunction(Broadcast<Encoder> bencoder) { + public RDDTransformApplyFunction(Broadcast<Encoder> bencoder, Broadcast<TfOffsetMap> omap) { _bencoder = bencoder; + _omap = omap; } @Override - public FrameBlock call(FrameBlock in) + public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> in) throws Exception { + long key = in._1(); + FrameBlock blk = in._2(); + //execute block transform apply Encoder encoder = _bencoder.getValue(); - MatrixBlock tmp = encoder.apply(in, new MatrixBlock(in.getNumRows(), in.getNumColumns(), false)); + MatrixBlock tmp = encoder.apply(blk, new MatrixBlock(blk.getNumRows(), blk.getNumColumns(), false)); + + //remap keys + if( _omap != null ) { + key = _omap.getValue().getOffset(key); + } //convert to frameblock to reuse frame-matrix reblock - return DataConverter.convertToFrameBlock(tmp); + return new Tuple2<Long, FrameBlock>(key, + DataConverter.convertToFrameBlock(tmp)); + } + } + + /** + * + */ + public static class RDDTransformApplyOffsetFunction implements PairFunction<Tuple2<Long,FrameBlock>,Long,Long> + { + private static final long serialVersionUID = 3450977356721057440L; + + private int[] _omitColList = null; + + public RDDTransformApplyOffsetFunction(String spec) { + try { + _omitColList = TfMetaUtils.parseJsonIDList(spec, TfUtils.TXMETHOD_OMIT); + } + catch (DMLRuntimeException e) { + throw new RuntimeException(e); + } + } + + @Override + public Tuple2<Long,Long> call(Tuple2<Long, FrameBlock> in) + throws Exception + { + long key = in._1(); + long rmRows = 0; + + FrameBlock blk = in._2(); + + for( int i=0; i<blk.getNumRows(); i++ ) { + boolean valid = true; + for( int j=0; j<_omitColList.length; j++ ) { + int colID = _omitColList[j]; + Object val = blk.get(i, colID-1); + valid &= !(val==null || (blk.getSchema().get(colID-1)== + ValueType.STRING && val.toString().isEmpty())); + } + rmRows += valid ? 0 : 1; + } + + return new Tuple2<Long, Long>(key, rmRows); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 34db095..98975da 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -117,6 +117,18 @@ public class SparkUtils return ret; } + /** + * + * @param in + * @return + */ + public static ArrayList<Pair<Long,Long>> toIndexedLong( List<Tuple2<Long, Long>> in ) { + ArrayList<Pair<Long, Long>> ret = new ArrayList<Pair<Long, Long>>(); + for( Tuple2<Long, Long> e : in ) + ret.add(new Pair<Long,Long>(e._1(), e._2())); + return ret; + } + /** * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 4124c03..0065fe8 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -62,6 +62,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable /** The column names of the data frame as an ordered list of strings */ private List<String> _colnames = null; + private List<ColumnMetadata> _colmeta = null; + /** The data frame data as an ordered list of columns */ private List<Array> _coldata = null; @@ -72,6 +74,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _numRows = 0; _schema = new ArrayList<ValueType>(); _colnames = new ArrayList<String>(); + _colmeta = new ArrayList<ColumnMetadata>(); _coldata = new ArrayList<Array>(); if( REUSE_RECODE_MAPS ) _rcdMapCache = new HashMap<Integer, SoftReference<HashMap<String,Long>>>(); @@ -104,6 +107,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable _numRows = 0; //maintained on append _schema = new ArrayList<ValueType>(schema); _colnames = new ArrayList<String>(names); + _colmeta = new ArrayList<ColumnMetadata>(); + for( int j=0; j<_schema.size(); j++ ) + _colmeta.add(new ColumnMetadata(0)); _coldata = new ArrayList<Array>(); for( int i=0; i<data.length; i++ ) appendRow(data[i]); @@ -157,6 +163,40 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable } /** + * + * @return + */ + public List<ColumnMetadata> getColumnMetadata() { + return _colmeta; + } + + /** + * + * @param c + * @return + */ + public ColumnMetadata getColumnMetadata(int c) { + return _colmeta.get(c); + } + + /** + * + * @param colmeta + */ + public void setColumnMetadata(List<ColumnMetadata> colmeta) { + _colmeta = colmeta; + } + + /** + * + * @param c + * @param colmeta + */ + public void setColumnMetadata(int c, ColumnMetadata colmeta) { + _colmeta.set(c, colmeta); + } + + /** * Creates a mapping from column names to column IDs, i.e., * 1-based column indexes * @@ -177,6 +217,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable //early abort if already allocated if( _schema.size() == _coldata.size() ) return; + //allocate column meta data + for( int j=0; j<_schema.size(); j++ ) + _colmeta.add(new ColumnMetadata(0)); //allocate columns if necessary for( int j=0; j<_schema.size(); j++ ) { if( j >= _coldata.size() ) @@ -420,6 +463,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable for( int j=0; j<getNumColumns(); j++ ) { out.writeByte(_schema.get(j).ordinal()); out.writeUTF(_colnames.get(j)); + out.writeLong(_colmeta.get(j).getNumDistinct()); + out.writeUTF( (_colmeta.get(j).getMvValue()!=null) ? + _colmeta.get(j).getMvValue() : "" ); _coldata.get(j).write(out); } } @@ -429,12 +475,15 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable //read head (rows, cols) _numRows = in.readInt(); int numCols = in.readInt(); - //read columns (value type, data) + //read columns (value type, meta, data) _schema.clear(); + _colmeta.clear(); _coldata.clear(); for( int j=0; j<numCols; j++ ) { ValueType vt = ValueType.values()[in.readByte()]; String name = in.readUTF(); + long ndistinct = in.readLong(); + String mvvalue = in.readUTF(); Array arr = null; switch( vt ) { case STRING: arr = new StringArray(new String[_numRows]); break; @@ -446,6 +495,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable arr.readFields(in); _schema.add(vt); _colnames.add(name); + _colmeta.add(new ColumnMetadata(ndistinct, + mvvalue.isEmpty() ? null : mvvalue)); _coldata.add(arr); } } @@ -1067,4 +1118,33 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable return new DoubleArray(Arrays.copyOfRange(_data,rl,ru+1)); } } + + /** + * + */ + public static class ColumnMetadata { + private long _ndistinct = -1; + private String _mvValue = null; + + public ColumnMetadata(long ndistinct, String mvval) { + _ndistinct = ndistinct; + _mvValue = mvval; + } + public ColumnMetadata(long ndistinct) { + _ndistinct = ndistinct; + } + + public long getNumDistinct() { + return _ndistinct; + } + public void setNumDistinct(long ndistinct) { + _ndistinct = ndistinct; + } + public String getMvValue() { + return _mvValue; + } + public void setMvValue(String mvVal) { + _mvValue = mvVal; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index 7e6c9c8..3ed9a1d 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -28,7 +28,9 @@ import java.nio.charset.CharacterCodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; +import java.util.List; +import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -37,11 +39,13 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; +import org.apache.sysml.lops.Lop; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; public class BinAgent extends Encoder @@ -56,33 +60,57 @@ public class BinAgent extends Encoder private double[] _min=null, _max=null; // min and max among non-missing values private double[] _binWidths = null; // width of a bin for each attribute - public BinAgent() { - super( null ); + //frame transform-apply attributes + private double[][] _binMins = null; + private double[][] _binMaxs = null; + + public BinAgent(int clen) { + super( null, clen ); + } + + public BinAgent(JSONObject parsedSpec, int clen) + throws JSONException, IOException + { + this(parsedSpec, clen, false); } - public BinAgent(JSONObject parsedSpec) - throws JSONException + /** + * + * @param parsedSpec + * @param clen + * @throws JSONException + * @throws IOException + */ + public BinAgent(JSONObject parsedSpec, int clen, boolean colsOnly) + throws JSONException, IOException { - super( null ); + super( null, clen ); if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) ) return; - JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN); - JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS); + if( colsOnly ) { + List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec); + initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0]))); + } + else + { + JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN); + JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); + JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS); + initColList(attrs); - initColList(attrs); - _numBins = new int[attrs.size()]; - for(int i=0; i < _numBins.length; i++) - _numBins[i] = UtilFunctions.toInt(nbins.get(i)); - - // initialize internal transformation metadata - _min = new double[_colList.length]; - Arrays.fill(_min, Double.MAX_VALUE); - _max = new double[_colList.length]; - Arrays.fill(_max, -Double.MAX_VALUE); - - _binWidths = new double[_colList.length]; + _numBins = new int[attrs.size()]; + for(int i=0; i < _numBins.length; i++) + _numBins[i] = UtilFunctions.toInt(nbins.get(i)); + + // initialize internal transformation metadata + _min = new double[_colList.length]; + Arrays.fill(_min, Double.MAX_VALUE); + _max = new double[_colList.length]; + Arrays.fill(_max, -Double.MAX_VALUE); + + _binWidths = new double[_colList.length]; + } } public int[] getNumBins() { return _numBins; } @@ -173,7 +201,7 @@ public class BinAgent extends Encoder private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX); + Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); br.close(); @@ -263,7 +291,7 @@ public class BinAgent extends Encoder for(int i=0; i<_colList.length;i++) { int colID = _colList[i]; - Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX); + Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); @@ -320,7 +348,17 @@ public class BinAgent extends Encoder @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out) { - return null; + for(int j=0; j<_colList.length; j++) { + int colID = _colList[j]; + for( int i=0; i<in.getNumRows(); i++ ) { + double inVal = UtilFunctions.objectToDouble( + in.getSchema().get(colID-1), in.get(i, colID-1)); + int ix = Arrays.binarySearch(_binMaxs[j], inVal); + int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1; + out.quickSetValue(i, colID-1, binID); + } + } + return out; } @Override @@ -331,20 +369,18 @@ public class BinAgent extends Encoder @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - // TODO Auto-generated method stub - return null; + build(in); + return apply(in, out); } @Override public void build(String[] in) { // TODO Auto-generated method stub - } @Override public void build(FrameBlock in) { // TODO Auto-generated method stub - } @Override @@ -352,4 +388,24 @@ public class BinAgent extends Encoder // TODO Auto-generated method stub return null; } + + /** + * + * @param meta + */ + public void initBins(FrameBlock meta) { + _binMins = new double[_colList.length][]; + _binMaxs = new double[_colList.length][]; + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + int nbins = (int)meta.getColumnMetadata().get(colID-1).getNumDistinct(); + _binMins[j] = new double[nbins]; + _binMaxs[j] = new double[nbins]; + for( int i=0; i<nbins; i++ ) { + String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX); + _binMins[j][i] = Double.parseDouble(tmp[0]); + _binMaxs[j][i] = Double.parseDouble(tmp[1]); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java index 9767e50..c17b662 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java @@ -678,7 +678,7 @@ public class DataTransform { int id = UtilFunctions.toInt(o); - Path binpath = new Path( tfMtdPath + "/Bin/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.BIN_FILE_SUFFIX); + Path binpath = new Path( tfMtdPath + "/Bin/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_BIN_FILE_SUFFIX); Path rcdpath = new Path( tfMtdPath + "/Recode/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); if ( TfUtils.checkValidInputFile(fs, binpath, false ) ) @@ -698,7 +698,7 @@ public class DataTransform else throw new DMLRuntimeException("Relevant transformation metadata for column (id=" + id + ", name=" + columnNames[id-1] + ") is not found."); } - //System.out.println("Number of columns in transformed data: " + ret); + return ret; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java index a25444d..bcb06df 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java @@ -42,8 +42,8 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; -import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; @@ -51,8 +51,6 @@ public class DummycodeAgent extends Encoder { private static final long serialVersionUID = 5832130477659116489L; - private long numCols = 0; - private HashMap<Integer, HashMap<String,String>> _finalMaps = null; private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null; private int[] _binList = null; @@ -62,18 +60,22 @@ public class DummycodeAgent extends Encoder private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs private long _dummycodedLength = 0; // #of columns after dummycoded - public DummycodeAgent(int[] list) { - super(list); + public DummycodeAgent(int[] list, int clen) { + super(list, clen); } - public DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException { - super(null); - numCols = ncol; + public DummycodeAgent(JSONObject parsedSpec, int clen) throws JSONException { + super(null, clen); - if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) - return; - JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_DUMMYCODE); - initColList( (JSONArray)obj.get(TfUtils.JSON_ATTRS) ); + if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) { + int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, TfUtils.TXMETHOD_DUMMYCODE); + initColList(collist); + } + } + + @Override + public int getNumCols() { + return (int)_dummycodedLength; } /** @@ -335,7 +337,7 @@ public class DummycodeAgent extends Encoder @Override public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { if ( !isApplicable() ) { - _dummycodedLength = numCols; + _dummycodedLength = _clen; return; } @@ -343,7 +345,7 @@ public class DummycodeAgent extends Encoder Arrays.sort(_colList); _domainSizes = new int[_colList.length]; - _dummycodedLength = numCols; + _dummycodedLength = _clen; //HashMap<String, String> map = null; for(int i=0; i<_colList.length; i++) { @@ -418,30 +420,63 @@ public class DummycodeAgent extends Encoder } @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out) { - return null; + public MatrixBlock apply(FrameBlock in, MatrixBlock out) + { + MatrixBlock ret = new MatrixBlock(out.getNumRows(), (int)_dummycodedLength, false); + + for( int i=0; i<out.getNumRows(); i++ ) { + for(int colID=1, idx=0, ncolID=1; colID <= out.getNumColumns(); colID++) { + double val = out.quickGetValue(i, colID-1); + if(idx < _colList.length && colID==_colList[idx]) { + ret.quickSetValue(i, ncolID-1+(int)val-1, 1); + ncolID += _domainSizes[idx]; + idx++; + } + else { + double ptval = UtilFunctions.objectToDouble(in.getSchema().get(colID-1), in.get(i, colID-1)); + ret.quickSetValue(i, ncolID-1, ptval); + ncolID++; + } + } + } + + return ret; } @Override public double[] encode(String[] in, double[] out) { + //TODO return null; } @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - return null; + return apply(in, out); } @Override public void build(String[] in) { + //do nothing } @Override public void build(FrameBlock in) { + //do nothing } @Override public FrameBlock getMetaData(FrameBlock out) { return null; } + + public void initDomainSizes(FrameBlock meta) { + //initialize domain sizes and output num columns + _domainSizes = new int[_colList.length]; + _dummycodedLength = _clen; + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + _domainSizes[j] = (int)meta.getColumnMetadata().get(colID-1).getNumDistinct(); + _dummycodedLength += _domainSizes[j]; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java index da3887d..1aeb465 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,6 +51,7 @@ import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; public class MVImputeAgent extends Encoder @@ -102,10 +104,19 @@ public class MVImputeAgent extends Encoder public KahanObject[] getMeans_scnomv() { return _scnomvMeanList; } public CM_COV_Object[] getVars_scnomv() { return _scnomvVarList; } - public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings) + public MVImputeAgent(JSONObject parsedSpec, int clen) + throws JSONException + { + super(null, clen); + int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, TfUtils.TXMETHOD_IMPUTE); + initColList(collist); + + } + + public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings, int clen) throws JSONException { - super(null); + super(null, clen); boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE); boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE); _NAstrings = NAstrings; @@ -518,7 +529,7 @@ public class MVImputeAgent extends Encoder private void writeTfMtd(int colID, String mean, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + TfUtils.MV_FILE_SUFFIX); + Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + mean + "\n"); br.close(); @@ -534,7 +545,7 @@ public class MVImputeAgent extends Encoder private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { - Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX); + Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); br.close(); @@ -799,7 +810,7 @@ public class MVImputeAgent extends Encoder private String readReplacement(int colID, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - Path path = new Path( txMtdDir + "/Impute/" + agents.getName(colID) + TfUtils.MV_FILE_SUFFIX); + Path path = new Path( txMtdDir + "/Impute/" + agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); @@ -915,11 +926,6 @@ public class MVImputeAgent extends Encoder return words; } - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out) { - return null; - } - public MVMethod getMethod(int colID) { int idx = isApplicable(colID); if(idx == -1) @@ -944,6 +950,20 @@ public class MVImputeAgent extends Encoder int idx = isApplicable(colID); return (idx == -1) ? null : _replacementList[idx]; } + + + @Override + public MatrixBlock apply(FrameBlock in, MatrixBlock out) { + for(int i=0; i<in.getNumRows(); i++) { + for(int j=0; j<_colList.length; j++) { + int colID = _colList[j]; + if( Double.isNaN(out.quickGetValue(i, colID-1)) ) + out.quickSetValue(i, colID-1, Double.parseDouble(_replacementList[j])); + } + } + return out; + } + @Override public double[] encode(String[] in, double[] out) { // TODO Auto-generated method stub @@ -969,4 +989,28 @@ public class MVImputeAgent extends Encoder // TODO Auto-generated method stub return null; } + + /** + * + * @param meta + * @param rcList + */ + public void initReplacementList(FrameBlock meta, List<Integer> rcList) { + //init replacement lists, replace recoded values to + //apply mv imputation potentially after recoding + _replacementList = new String[_colList.length]; + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); + if( rcList.contains(colID) ) { + Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal); + if( mvVal2 == null) + throw new RuntimeException("Missing recode value for impute value '"+mvVal+"'."); + _replacementList[j] = mvVal2.toString(); + } + else { + _replacementList[j] = mvVal; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java index b83cbaa..539ff30 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java @@ -27,34 +27,40 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; public class OmitAgent extends Encoder { private static final long serialVersionUID = 1978852120416654195L; - public OmitAgent() { - super(null); + private int _rmRows = 0; + + public OmitAgent(int clen) { + super(null, clen); } - public OmitAgent(int[] list) { - super(list); + public OmitAgent(int[] list, int clen) { + super(list, clen); } - public OmitAgent(JSONObject parsedSpec) + public OmitAgent(JSONObject parsedSpec, int clen) throws JSONException { - super(null); + super(null, clen); if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT)) return; - JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_OMIT); - initColList((JSONArray) obj.get(TfUtils.JSON_ATTRS)); + int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, TfUtils.TXMETHOD_OMIT); + initColList(collist); + } + + public int getNumRemovedRows() { + return _rmRows; } public boolean omit(String[] words, TfUtils agents) @@ -93,8 +99,37 @@ public class OmitAgent extends Encoder } @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out) { - return null; + public MatrixBlock apply(FrameBlock in, MatrixBlock out) + { + //determine output size + int numRows = 0; + for(int i=0; i<out.getNumRows(); i++) { + boolean valid = true; + for(int j=0; j<_colList.length; j++) + valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1)); + numRows += valid ? 1 : 0; + } + + //copy over valid rows into the output + MatrixBlock ret = new MatrixBlock(numRows, out.getNumColumns(), false); + int pos = 0; + for(int i=0; i<in.getNumRows(); i++) { + //determine if valid row or omit + boolean valid = true; + for(int j=0; j<_colList.length; j++) + valid &= !Double.isNaN(out.quickGetValue(i, _colList[j]-1)); + //copy row if necessary + if( valid ) { + for(int j=0; j<out.getNumColumns(); j++) + ret.quickSetValue(pos, j, out.quickGetValue(i, j)); + pos++; + } + } + + //keep info an remove rows + _rmRows = out.getNumRows() - pos; + + return ret; } @Override @@ -104,22 +139,23 @@ public class OmitAgent extends Encoder @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - return null; + return apply(in, out); } @Override public void build(String[] in) { - + //do nothing } @Override public void build(FrameBlock in) { - + //do nothing } @Override public FrameBlock getMetaData(FrameBlock out) { - return null; + //do nothing + return out; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java index 6170412..a44ad5a 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java @@ -43,8 +43,8 @@ import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; import org.apache.sysml.runtime.transform.decode.DecoderRecode; import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; -import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; @@ -59,37 +59,23 @@ public class RecodeAgent extends Encoder private HashMap<Integer, HashMap<String, Long>> _rcdMaps = new HashMap<Integer, HashMap<String, Long>>(); private HashMap<Integer, HashMap<String,String>> _finalMaps = null; - public RecodeAgent(JSONObject parsedSpec) + public RecodeAgent(JSONObject parsedSpec, int clen) throws JSONException { - super(null); - int rcdCount = 0; - if ( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE)) - { - //TODO consolidate external and internal json spec definitions - JSONArray attrs = null; - if( parsedSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { - JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_RECODE); - attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - } - else - attrs = (JSONArray)parsedSpec.get(TfUtils.TXMETHOD_RECODE); - rcdCount = initColList(attrs); + super(null, clen); + int rcdCount = 0; + + if( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) { + int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, TfUtils.TXMETHOD_RECODE); + rcdCount = initColList(collist); } - if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) - { - JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_MVRCD); - JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - - _mvrcdList = new int[attrs.size()]; - for(int i=0; i < _mvrcdList.length; i++) - _mvrcdList[i] = UtilFunctions.toInt(attrs.get(i)); - rcdCount += attrs.size(); + if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) { + _mvrcdList = TfMetaUtils.parseJsonIDList(parsedSpec, TfUtils.TXMETHOD_MVRCD); + rcdCount += _mvrcdList.length; } - if ( rcdCount > 0 ) - { + if ( rcdCount > 0 ) { _fullrcdList = new int[rcdCount]; int idx = -1; if(_colList != null) @@ -305,7 +291,7 @@ public class RecodeAgent extends Encoder if (isModeImputed) { - pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) + TfUtils.MV_FILE_SUFFIX); + pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX); br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); br.write(colID + "," + UtilFunctions.quote(mode)); br.close(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java index 16ccf6e..24bde22 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java @@ -83,20 +83,20 @@ public class TfUtils implements Serializable{ public static final String TXMETHOD_MVRCD = "mvrcd"; //transform meta data constants - public static final String TXMTD_SEP = ","; - public static final String TXMTD_COLTYPES = "coltypes.csv"; + public static final String TXMTD_SEP = ","; + public static final String TXMTD_COLTYPES = "coltypes.csv"; public static final String TXMTD_COLNAMES = "column.names"; public static final String TXMTD_DC_COLNAMES = "dummycoded.column.names"; - public static final String TXMTD_RCD_MAP_SUFFIX = ".map"; + public static final String TXMTD_RCD_MAP_SUFFIX = ".map"; public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct"; + public static final String TXMTD_BIN_FILE_SUFFIX = ".bin"; + public static final String TXMTD_MV_FILE_SUFFIX = ".impute"; public static final String JSON_ATTRS = "attributes"; public static final String JSON_MTHD = "methods"; public static final String JSON_CONSTS = "constants"; public static final String JSON_NBINS = "numbins"; - protected static final String MV_FILE_SUFFIX = ".impute"; protected static final String MODE_FILE_SUFFIX = ".mode"; - protected static final String BIN_FILE_SUFFIX = ".bin"; protected static final String SCALE_FILE_SUFFIX = ".scale"; protected static final String DCD_FILE_NAME = "dummyCodeMaps.csv"; protected static final String DCD_NAME_SEP = "_"; @@ -119,7 +119,7 @@ public class TfUtils implements Serializable{ private String _delimString = null; private String[] _NAstrings = null; private String[] _outputColumnNames = null; - private long _numInputCols = -1; + private int _numInputCols = -1; private String _tfMtdDir = null; private String _spec = null; @@ -135,7 +135,7 @@ public class TfUtils implements Serializable{ } _NAstrings = TfUtils.parseNAStrings(job); _spec = job.get(MRJobConfiguration.TF_SPEC); - _oa = new OmitAgent(new JSONObject(_spec)); + _oa = new OmitAgent(new JSONObject(_spec), -1); } // called from GenTFMtdMapper, ApplyTf (Hadoop) @@ -176,7 +176,7 @@ public class TfUtils implements Serializable{ throws IOException, JSONException { //TODO recodemaps handover - _numInputCols = inNcol; + _numInputCols = (int)inNcol; createAgents(spec, new String[]{}); } @@ -244,10 +244,10 @@ public class TfUtils implements Serializable{ private void createAgents(JSONObject spec, String[] naStrings) throws IOException, JSONException { - _oa = new OmitAgent(spec); - _mia = new MVImputeAgent(spec, naStrings); - _ra = new RecodeAgent(spec); - _ba = new BinAgent(spec); + _oa = new OmitAgent(spec, _numInputCols); + _mia = new MVImputeAgent(spec, naStrings, _numInputCols); + _ra = new RecodeAgent(spec, _numInputCols); + _ba = new BinAgent(spec, _numInputCols); _da = new DummycodeAgent(spec, _numInputCols); } @@ -279,7 +279,7 @@ public class TfUtils implements Serializable{ _delimString = delim; _delim = Pattern.compile(Pattern.quote(delim)); _NAstrings = naStrings; - _numInputCols = numCols; + _numInputCols = (int)numCols; _offsetFile = offsetFile; _tmpDir = tmpPath; _outputPath = outputPath; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java index 98684da..ac01357 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java @@ -45,16 +45,27 @@ public abstract class Encoder implements Serializable { private static final long serialVersionUID = 2299156350718979064L; + protected int _clen = -1; protected int[] _colList = null; - protected Encoder( int[] colList ) { + protected Encoder( int[] colList, int clen ) { _colList = colList; + _clen = clen; } public int[] getColList() { return _colList; } + public void setColList(int[] colList) { + _colList = colList; + } + + public int getNumCols() { + return _clen; + } + + /** * * @param attrs @@ -67,6 +78,16 @@ public abstract class Encoder implements Serializable } /** + * + * @param attrs + * @return + */ + public int initColList(int[] colList) { + _colList = colList; + return _colList.length; + } + + /** * Indicates if this encoder is applicable, i.e, if there is at * least one column to encode. * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java index b3477ea..1c27350 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java @@ -47,26 +47,34 @@ public class EncoderComposite extends Encoder private List<Encoder> _encoders = null; protected EncoderComposite(List<Encoder> encoders) { - super(null); + super(null, -1); _encoders = encoders; } protected EncoderComposite(Encoder[] encoders) { - super(null); + super(null, -1); _encoders = Arrays.asList(encoders); } - + + @Override + public int getNumCols() { + int clen = 0; + for( Encoder encoder : _encoders ) + clen = Math.max(clen, encoder.getNumCols()); + return clen; + } + @Override public double[] encode(String[] in, double[] out) { for( Encoder encoder : _encoders ) - encoder.encode(in, out); + out = encoder.encode(in, out); return out; } @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { for( Encoder encoder : _encoders ) - encoder.encode(in, out); + out = encoder.encode(in, out); return out; } @@ -99,7 +107,7 @@ public class EncoderComposite extends Encoder @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out) { for( Encoder encoder : _encoders ) - encoder.apply(in, out); + out = encoder.apply(in, out); return out; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java index 75fe639..c3959ca 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java @@ -20,15 +20,23 @@ package org.apache.sysml.runtime.transform.encode; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.transform.BinAgent; +import org.apache.sysml.runtime.transform.DummycodeAgent; +import org.apache.sysml.runtime.transform.MVImputeAgent; +import org.apache.sysml.runtime.transform.OmitAgent; import org.apache.sysml.runtime.transform.RecodeAgent; import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.wink.json4j.JSONObject; public class EncoderFactory @@ -66,43 +74,66 @@ public class EncoderFactory * @return * @throws DMLRuntimeException */ + @SuppressWarnings("unchecked") public static Encoder createEncoder(String spec, List<ValueType> schema, FrameBlock meta) throws DMLRuntimeException { Encoder encoder = null; + int clen = schema.size(); try { //parse transform specification JSONObject jSpec = new JSONObject(spec); List<Encoder> lencoders = new ArrayList<Encoder>(); - //create encoders 'recode' and 'pass-through' - if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { - RecodeAgent ra = new RecodeAgent(jSpec); + //prepare basic id lists (recode, dummycode, pass-through) + //note: any dummycode column requires recode as preparation + List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_RECODE))); + List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_DUMMYCODE))); + rcIDs = new ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs)); + List<Integer> binIDs = TfMetaUtils.parseBinningColIDs(jSpec); + List<Integer> ptIDs = new ArrayList<Integer>(CollectionUtils.subtract( + CollectionUtils.subtract(UtilFunctions.getSequenceList(1, clen, 1), rcIDs), binIDs)); + List<Integer> oIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_OMIT))); + List<Integer> mvIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonObjectIDList(jSpec, TfUtils.TXMETHOD_IMPUTE))); + + //create individual encoders + if( !rcIDs.isEmpty() ) { + RecodeAgent ra = new RecodeAgent(jSpec, clen); + ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0]))); if( meta != null ) ra.initRecodeMaps(meta); - lencoders.add(ra); - - //pass-through decode (non-recode columns) - int[] rcCols = ra.getColList(); - if( schema.size() > rcCols.length ) { - int[] ptCols = new int[schema.size()-rcCols.length]; - HashSet<Integer> probe = new HashSet<Integer>(); - for( int j=0; j<rcCols.length; j++ ) - probe.add(rcCols[j]-1); - for( int j=0, pos=0; j<schema.size(); j++ ) - if( !probe.contains(j) ) - ptCols[pos++] = j; - lencoders.add(new EncoderPassThrough(ptCols)); - } + lencoders.add(ra); + } + if( !ptIDs.isEmpty() ) { + lencoders.add(new EncoderPassThrough( + ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen)); + } + if( !dcIDs.isEmpty() ) { + DummycodeAgent da = new DummycodeAgent(jSpec, schema.size()); + if( meta != null ) + da.initDomainSizes(meta); + lencoders.add(da); + } + if( !binIDs.isEmpty() ) { + BinAgent ba = new BinAgent(jSpec, schema.size(), true); + if( meta != null ) + ba.initBins(meta); + lencoders.add(ba); + } + if( !oIDs.isEmpty() ) { + lencoders.add(new OmitAgent(jSpec, schema.size())); + } + if( !mvIDs.isEmpty() ) { + MVImputeAgent ma = new MVImputeAgent(jSpec, schema.size()); + if( meta != null ) + ma.initReplacementList(meta, rcIDs); + lencoders.add(ma); } - //create full 'pass-through' encoder if necessary - else { - int[] ptCols = new int[schema.size()]; - for( int j=0; j<ptCols.length; j++ ) - ptCols[j] = j; - lencoders.add(new EncoderPassThrough(ptCols)); - } //create composite decoder of all created decoders encoder = new EncoderComposite(lencoders); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java index e5a0c9f..445d019 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java @@ -44,15 +44,15 @@ public class EncoderPassThrough extends Encoder { private static final long serialVersionUID = -8473768154646831882L; - protected EncoderPassThrough(int[] ptCols) { - super(ptCols); //0-based indexes + protected EncoderPassThrough(int[] ptCols, int clen) { + super(ptCols, clen); //1-based } @Override public double[] encode(String[] in, double[] out) { for( int j=0; j<_colList.length; j++ ) { - String tmp = in[_colList[j]]; - out[_colList[j]] = (tmp==null) ? 0 : + String tmp = in[_colList[j]-1]; + out[_colList[j]-1] = (tmp==null) ? 0 : Double.parseDouble(tmp); } @@ -87,12 +87,13 @@ public class EncoderPassThrough extends Encoder @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out) { for( int j=0; j<_colList.length; j++ ) { - int col = _colList[j]; + int col = _colList[j]-1; ValueType vt = in.getSchema().get(col); for( int i=0; i<in.getNumRows(); i++ ) { Object val = in.get(i, col); - out.quickSetValue(i, col, - UtilFunctions.objectToDouble(vt, val)); + out.quickSetValue(i, col, (val==null||(vt==ValueType.STRING + && val.toString().isEmpty())) ? Double.NaN : + UtilFunctions.objectToDouble(vt, val)); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java index 2536b23..b9feb5d 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java @@ -30,12 +30,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map.Entry; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.lops.Lop; import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; @@ -44,12 +46,98 @@ import org.apache.sysml.runtime.transform.decode.DecoderRecode; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.wink.json4j.JSONArray; +import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; public class TfMetaUtils { - private static final Log LOG = LogFactory.getLog(TfMetaUtils.class.getName()); - + /** + * + * @param spec + * @return + * @throws DMLRuntimeException + */ + public static boolean containsOmitSpec(String spec) throws DMLRuntimeException { + return (TfMetaUtils.parseJsonIDList(spec, TfUtils.TXMETHOD_OMIT).length > 0); + } + + /** + * + * @param spec + * @param group + * @return + * @throws JSONException + */ + public static int[] parseJsonIDList(String spec, String group) + throws DMLRuntimeException + { + try { + JSONObject jSpec = new JSONObject(spec); + return parseJsonIDList(jSpec, group); + } + catch(JSONException ex) { + throw new DMLRuntimeException(ex); + } + } + + /** + * TODO consolidate external and internal json spec definitions + * + * @param parsedSpec + * @param group + * @return + * @throws JSONException + */ + public static int[] parseJsonIDList(JSONObject spec, String group) + throws JSONException + { + int[] colList = new int[0]; + + if( spec.containsKey(group) ) { + //parse attribute-array or plain array of IDs + JSONArray attrs = null; + if( spec.get(group) instanceof JSONObject ) + attrs = (JSONArray) ((JSONObject)spec.get(group)).get(TfUtils.JSON_ATTRS); + else + attrs = (JSONArray)spec.get(group); + + //construct ID list array + colList = new int[attrs.size()]; + for(int i=0; i < colList.length; i++) + colList[i] = UtilFunctions.toInt(attrs.get(i)); + + //ensure ascending order of column IDs + Arrays.sort(colList); + } + + return colList; + } + + /** + * + * @param spec + * @param group + * @return + * @throws JSONException + */ + public static int[] parseJsonObjectIDList(JSONObject spec, String group) + throws JSONException + { + int[] colList = new int[0]; + + if( spec.containsKey(group) && spec.get(group) instanceof JSONArray ) + { + JSONArray colspecs = (JSONArray)spec.get(group); + colList = new int[colspecs.size()]; + for(int j=0; j<colspecs.size(); j++) { + JSONObject colspec = (JSONObject) colspecs.get(j); + colList[j] = colspec.getInt("id"); + } + } + + return colList; + } + /** * Reads transform meta data from an HDFS file path and converts it into an in-memory * FrameBlock object. @@ -63,41 +151,45 @@ public class TfMetaUtils public static FrameBlock readTransformMetaDataFromFile(String spec, String metapath, String colDelim) throws IOException { - //NOTE: this implementation assumes column alignment of colnames and coltypes - - //read column types (for sanity check column names) - String coltypesStr = MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLTYPES); - List<String> coltypes = Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP)); - //read column names String colnamesStr = MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLNAMES); List<String> colnames = Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim)); - if( coltypes.size() != colnames.size() ) { - LOG.warn("Number of columns names: "+colnames.size()+" (expected: "+coltypes.size()+")."); - LOG.warn("--Sample column names: "+(!colnames.isEmpty()?colnames.get(0):"null")); - } - //read meta data (currently only recode supported, without parsing spec) + //read meta data (currently supported: recode, dummycode, bin, omit, impute) + //note: recode/binning and impute might be applied on the same column HashMap<String,String> meta = new HashMap<String,String>(); + HashMap<String,String> mvmeta = new HashMap<String,String>(); int rows = 0; for( int j=0; j<colnames.size(); j++ ) { String colName = colnames.get(j); + //read recode maps for recoded or dummycoded columns String name = metapath+File.separator+"Recode"+File.separator+colName; if( MapReduceTool.existsFileOnHDFS(name+TfUtils.TXMTD_RCD_MAP_SUFFIX) ) { meta.put(colName, MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_MAP_SUFFIX)); String ndistinct = MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); rows = Math.max(rows, Integer.parseInt(ndistinct)); } - else if( coltypes.get(j).equals("2") ) { - LOG.warn("Recode map for column '"+colName+"' does not exist."); + //read binning map for binned columns + String name2 = metapath+File.separator+"Bin"+File.separator+colName; + if( MapReduceTool.existsFileOnHDFS(name2+TfUtils.TXMTD_BIN_FILE_SUFFIX) ) { + String binmap = MapReduceTool.readStringFromHDFSFile(name2+TfUtils.TXMTD_BIN_FILE_SUFFIX); + meta.put(colName, binmap); + rows = Math.max(rows, Integer.parseInt(binmap.split(TfUtils.TXMTD_SEP)[4])); + } + //read impute value for mv columns + String name3 = metapath+File.separator+"Impute"+File.separator+colName; + if( MapReduceTool.existsFileOnHDFS(name3+TfUtils.TXMTD_MV_FILE_SUFFIX) ) { + String mvmap = MapReduceTool.readStringFromHDFSFile(name3+TfUtils.TXMTD_MV_FILE_SUFFIX); + mvmeta.put(colName, mvmap); } } //get list of recode ids - List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes); + List<Integer> recodeIDs = parseRecodeColIDs(spec); + List<Integer> binIDs = parseBinningColIDs(spec); //create frame block from in-memory strings - return convertToTransformMetaDataFrame(rows, recodeIDs, colnames, meta); + return convertToTransformMetaDataFrame(rows, colnames, recodeIDs, binIDs, meta, mvmeta); } /** @@ -113,25 +205,18 @@ public class TfMetaUtils public static FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) throws IOException { - //NOTE: this implementation assumes column alignment of colnames and coltypes - - //read column types (for sanity check column names) - String coltypesStr = IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLTYPES)); - List<String> coltypes = Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP)); - //read column names String colnamesStr = IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLNAMES)); List<String> colnames = Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim)); - if( coltypes.size() != colnames.size() ) { - LOG.warn("Number of columns names: "+colnames.size()+" (expected: "+coltypes.size()+")."); - LOG.warn("--Sample column names: "+(!colnames.isEmpty()?colnames.get(0):"null")); - } - //read meta data (currently only recode supported, without parsing spec) + //read meta data (currently supported: recode, dummycode, bin, omit) + //note: recode/binning and impute might be applied on the same column HashMap<String,String> meta = new HashMap<String,String>(); + HashMap<String,String> mvmeta = new HashMap<String,String>(); int rows = 0; for( int j=0; j<colnames.size(); j++ ) { String colName = colnames.get(j); + //read recode maps for recoded or dummycoded columns String name = metapath+"/"+"Recode"+"/"+colName; String map = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_MAP_SUFFIX)); if( map != null ) { @@ -139,16 +224,27 @@ public class TfMetaUtils String ndistinct = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX)); rows = Math.max(rows, Integer.parseInt(ndistinct)); } - else if( coltypes.get(j).equals("2") ) { - LOG.warn("Recode map for column '"+colName+"' does not exist."); + //read binning map for binned columns + String name2 = metapath+"/"+"Bin"+"/"+colName; + String map2 = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name2+TfUtils.TXMTD_BIN_FILE_SUFFIX)); + if( map2 != null ) { + meta.put(colName, map2); + rows = Math.max(rows, Integer.parseInt(map2.split(TfUtils.TXMTD_SEP)[4])); + } + //read impute value for mv columns + String name3 = metapath+File.separator+"Impute"+File.separator+colName; + String map3 = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name3+TfUtils.TXMTD_MV_FILE_SUFFIX)); + if( map3 != null ) { + mvmeta.put(colName, map3); } } //get list of recode ids - List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes); + List<Integer> recodeIDs = parseRecodeColIDs(spec); + List<Integer> binIDs = parseBinningColIDs(spec); //create frame block from in-memory strings - return convertToTransformMetaDataFrame(rows, recodeIDs, colnames, meta); + return convertToTransformMetaDataFrame(rows, colnames, recodeIDs, binIDs, meta, mvmeta); } /** @@ -161,7 +257,8 @@ public class TfMetaUtils * @return * @throws IOException */ - private static FrameBlock convertToTransformMetaDataFrame(int rows, List<Integer> recodeIDs, List<String> colnames, HashMap<String,String> meta) + private static FrameBlock convertToTransformMetaDataFrame(int rows, List<String> colnames, List<Integer> rcIDs, List<Integer> binIDs, + HashMap<String,String> meta, HashMap<String,String> mvmeta) throws IOException { //create frame block w/ pure string schema @@ -169,8 +266,8 @@ public class TfMetaUtils FrameBlock ret = new FrameBlock(schema, colnames); ret.ensureAllocatedColumns(rows); - //encode recode maps into frame - for( Integer colID : recodeIDs ) { + //encode recode maps (recoding/dummycoding) into frame + for( Integer colID : rcIDs ) { String name = colnames.get(colID-1); String map = meta.get(name); if( map == null ) @@ -179,12 +276,39 @@ public class TfMetaUtils InputStream is = new ByteArrayInputStream(map.getBytes("UTF-8")); BufferedReader br = new BufferedReader(new InputStreamReader(is)); Pair<String,String> pair = new Pair<String,String>(); - String line; int rpos = 0; + String line; int rpos = 0; while( (line = br.readLine()) != null ) { DecoderRecode.parseRecodeMapEntry(line, pair); String tmp = pair.getKey() + Lop.DATATYPE_PREFIX + pair.getValue(); ret.set(rpos++, colID-1, tmp); } + ret.getColumnMetadata(colID-1).setNumDistinct((long)rpos); + } + + //encode bin maps (binning) into frame + for( Integer colID : binIDs ) { + String name = colnames.get(colID-1); + String map = meta.get(name); + if( map == null ) + throw new IOException("Binning map for column '"+name+"' (id="+colID+") not existing."); + String[] fields = map.split(TfUtils.TXMTD_SEP); + double min = UtilFunctions.parseToDouble(fields[1]); + double binwidth = UtilFunctions.parseToDouble(fields[3]); + int nbins = UtilFunctions.parseToInt(fields[4]); + //materialize bins to support equi-width/equi-height + for( int i=0; i<nbins; i++ ) { + String lbound = String.valueOf(min+i*binwidth); + String ubound = String.valueOf(min+(i+1)*binwidth); + ret.set(i, colID-1, lbound+Lop.DATATYPE_PREFIX+ubound); + } + ret.getColumnMetadata(colID-1).setNumDistinct((long)nbins); + } + + //encode impute meta data into frame + for( Entry<String, String> e : mvmeta.entrySet() ) { + int colID = colnames.indexOf(e.getKey()) + 1; + String mvVal = e.getValue().split(TfUtils.TXMTD_SEP)[1]; + ret.getColumnMetadata(colID-1).setMvValue(mvVal); } return ret; @@ -199,33 +323,23 @@ public class TfMetaUtils * @return * @throws IOException */ - private static ArrayList<Integer> parseRecodeColIDs(String spec, List<String> coltypes) + @SuppressWarnings("unchecked") + private static List<Integer> parseRecodeColIDs(String spec) throws IOException { - ArrayList<Integer> specRecodeIDs = new ArrayList<Integer>(); + if( spec == null ) + throw new IOException("Missing transform specification."); + + List<Integer> specRecodeIDs = null; try { - if( spec != null ) { - //parse json transform specification for recode col ids - JSONObject jSpec = new JSONObject(spec); - if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { - JSONArray attrs = null; //TODO simplify once json spec consolidated - if( jSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { - JSONObject obj = (JSONObject) jSpec.get(TfUtils.TXMETHOD_RECODE); - attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - } - else - attrs = (JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE); - for(int j=0; j<attrs.length(); j++) - specRecodeIDs.add(UtilFunctions.toInt(attrs.get(j))); - } - } - else { - //obtain recode col ids from coltypes - for( int j=0; j<coltypes.size(); j++ ) - if( coltypes.get(j).equals("2") ) - specRecodeIDs.add(j+1); - } + //parse json transform specification for recode col ids + JSONObject jSpec = new JSONObject(spec); + List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_RECODE))); + List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_DUMMYCODE))); + specRecodeIDs = new ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs)); } catch(Exception ex) { throw new IOException(ex); @@ -233,4 +347,46 @@ public class TfMetaUtils return specRecodeIDs; } + + /** + * + * @param spec + * @return + * @throws IOException + */ + public static List<Integer> parseBinningColIDs(String spec) + throws IOException + { + try { + JSONObject jSpec = new JSONObject(spec); + return parseBinningColIDs(jSpec); + } + catch(JSONException ex) { + throw new IOException(ex); + } + } + + /** + * + * @param jSpec + * @return + * @throws IOException + */ + public static List<Integer> parseBinningColIDs(JSONObject jSpec) + throws IOException + { + try { + if( jSpec.containsKey(TfUtils.TXMETHOD_BIN) && jSpec.get(TfUtils.TXMETHOD_BIN) instanceof JSONArray ) { + return Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonObjectIDList(jSpec, TfUtils.TXMETHOD_BIN))); + } + else { //internally generates + return Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_BIN))); + } + } + catch(JSONException ex) { + throw new IOException(ex); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java new file mode 100644 index 0000000..a17287f --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.meta; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.sysml.runtime.matrix.data.Pair; + +/** + * + */ +public class TfOffsetMap implements Serializable +{ + private static final long serialVersionUID = 8949124761287236703L; + + private HashMap<Long, Long> _map = null; + private long _rmRows = -1; + + public TfOffsetMap( List<Pair<Long,Long>> rmRows ) { + //sort input list of <key, rm rows per block> + TreeMap<Long, Long> tmap = new TreeMap<Long, Long>(); + for( Pair<Long, Long> pair : rmRows ) + tmap.put(pair.getKey(), pair.getValue()); + + //compute shifted keys and build hash table + _map = new HashMap<Long, Long>(); + long shift = 0; + for( Entry<Long, Long> e : tmap.entrySet() ) { + _map.put(e.getKey(), e.getKey()-shift); + shift += e.getValue(); + } + _rmRows = shift; + } + + /** + * + * @param oldKey + * @return + */ + public long getOffset(long oldKey) { + return _map.get(oldKey); + } + + /** + * + * @return + */ + public long getNumRmRows() { + return _rmRows; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index e292b39..3711a1a 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -19,6 +19,9 @@ package org.apache.sysml.runtime.util; +import java.util.ArrayList; +import java.util.List; + import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData; @@ -441,4 +444,18 @@ public class UtilFunctions else return Long.parseLong(arg.substring(0,arg.length())); } + + /** + * + * @param low lower bound (inclusive) + * @param up upper bound (inclusive) + * @param incr increment + * @return + */ + public static List<Integer> getSequenceList(int low, int up, int incr) { + ArrayList<Integer> ret = new ArrayList<Integer>(); + for( int i=low; i<=up; i+=incr ) + ret.add(i); + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java index 1abe68a..2ce82f3 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java @@ -20,16 +20,22 @@ package org.apache.sysml.test.integration.functions.jmlc; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.lang3.ArrayUtils; import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.lops.Lop; +import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.test.integration.AutomatedTestBase; @@ -101,19 +107,19 @@ public class FrameReadMetaTest extends AutomatedTestBase //establish connection to SystemML Connection conn = new Connection(); - //read meta data frame + //read meta data frame String spec = MapReduceTool.readStringFromHDFSFile(SCRIPT_DIR + TEST_DIR+"tfmtd_example/spec.json"); FrameBlock M = readFrame ? DataConverter.convertToFrameBlock(conn.readStringFrame(SCRIPT_DIR + TEST_DIR+"tfmtd_frame_example/tfmtd_frame")) : - conn.readTransformMetaDataFromFile(useSpec ? spec : null, SCRIPT_DIR + TEST_DIR+"tfmtd_example/"); - - //generate data based on recode maps - HashMap<String,Long>[] RC = getRecodeMaps(M); - double[][] X = generateData(rows, cols, RC); - String[][] F = null; + conn.readTransformMetaDataFromFile(spec, SCRIPT_DIR + TEST_DIR+"tfmtd_example/"); try { + //generate data based on recode maps + HashMap<String,Long>[] RC = getRecodeMaps(spec, M); + double[][] X = generateData(rows, cols, RC); + String[][] F = null; + //prepare input arguments HashMap<String,String> args = new HashMap<String,String>(); args.put("$TRANSFORM_SPEC", spec); @@ -139,6 +145,17 @@ public class FrameReadMetaTest extends AutomatedTestBase //get output parameter F = rs.getFrame("F"); } + + + //check correct result + //for all generated data, probe recode maps and compare versus output + for( int i=0; i<rows; i++ ) + for( int j=0; j<cols; j++ ) + if( RC[j] != null ) { + Assert.assertEquals("Wrong result: "+F[i][j]+".", + Double.valueOf(X[i][j]), + Double.valueOf(RC[j].get(F[i][j]).toString())); + } } catch(Exception ex) { ex.printStackTrace(); @@ -149,31 +166,26 @@ public class FrameReadMetaTest extends AutomatedTestBase if( conn != null ) conn.close(); } - - //check correct result - //for all generated data, probe recode maps and compare versus output - for( int i=0; i<rows; i++ ) - for( int j=0; j<cols; j++ ) - if( RC[j] != null ) { - Assert.assertEquals("Wrong result: "+F[i][j]+".", - Double.valueOf(X[i][j]), - Double.valueOf(RC[j].get(F[i][j]).toString())); - } } /** * * @param M * @return + * @throws DMLRuntimeException */ @SuppressWarnings("unchecked") - private HashMap<String,Long>[] getRecodeMaps(FrameBlock M) { + private HashMap<String,Long>[] getRecodeMaps(String spec, FrameBlock M) + throws DMLRuntimeException + { + List<Integer> collist = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(spec, TfUtils.TXMETHOD_RECODE))); HashMap<String,Long>[] ret = new HashMap[M.getNumColumns()]; Iterator<Object[]> iter = M.getObjectRowIterator(); while( iter.hasNext() ) { Object[] tmp = iter.next(); for( int j=0; j<tmp.length; j++ ) - if( tmp[j] != null ) { + if( collist.contains(j+1) && tmp[j] != null ) { if( ret[j] == null ) ret[j] = new HashMap<String,Long>(); String[] parts = tmp[j].toString().split(Lop.DATATYPE_PREFIX); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java index e8837cc..6921b77 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java @@ -39,10 +39,14 @@ public class TransformFrameTest extends AutomatedTestBase private final static String TEST_DIR = "functions/transform/"; private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameTest.class.getSimpleName() + "/"; - private final static String DATASET = "homes3/homes.csv"; + //dataset and transform tasks without missing values + private final static String DATASET1 = "homes3/homes.csv"; private final static String SPEC1 = "homes3/homes.tfspec_recode.json"; private final static String SPEC2 = "homes3/homes.tfspec_dummy.json"; - private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; + private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; //incl recode + + //dataset and transform tasks with missing values + private final static String DATASET2 = "homes/homes.csv"; private final static String SPEC4 = "homes3/homes.tfspec_impute.json"; private final static String SPEC5 = "homes3/homes.tfspec_omit.json"; @@ -70,6 +74,46 @@ public class TransformFrameTest extends AutomatedTestBase public void testHomesRecodeSparkCSV() { runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.RECODE); } + + @Test + public void testHomesDummycodeSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.DUMMY); + } + + @Test + public void testHomesDummycodeSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.DUMMY); + } + + @Test + public void testHomesBinningSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.BIN); + } + + @Test + public void testHomesBinningSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.BIN); + } + + @Test + public void testHomesOmitSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.OMIT); + } + + @Test + public void testHomesOmitSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.OMIT); + } + + @Test + public void testHomesImputeSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.IMPUTE); + } + + @Test + public void testHomesImputeSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.IMPUTE); + } /** * @@ -89,13 +133,13 @@ public class TransformFrameTest extends AutomatedTestBase DMLScript.USE_LOCAL_SPARK_CONFIG = true; //set transform specification - String SPEC = null; + String SPEC = null; String DATASET = null; switch( type ) { - case RECODE: SPEC = SPEC1; break; - case DUMMY: SPEC = SPEC2; break; - case BIN: SPEC = SPEC3; break; - case IMPUTE: SPEC = SPEC4; break; - case OMIT: SPEC = SPEC5; break; + case RECODE: SPEC = SPEC1; DATASET = DATASET1; break; + case DUMMY: SPEC = SPEC2; DATASET = DATASET1; break; + case BIN: SPEC = SPEC3; DATASET = DATASET1; break; + case IMPUTE: SPEC = SPEC4; DATASET = DATASET2; break; + case OMIT: SPEC = SPEC5; DATASET = DATASET2; break; } if( !ofmt.equals("csv") ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/java/org/apache/sysml/test/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/utils/TestUtils.java b/src/test/java/org/apache/sysml/test/utils/TestUtils.java index 4f97f7f..78ebf60 100644 --- a/src/test/java/org/apache/sysml/test/utils/TestUtils.java +++ b/src/test/java/org/apache/sysml/test/utils/TestUtils.java @@ -730,8 +730,10 @@ public class TestUtils int countErrors = 0; for (int i = 0; i < rows; i++) { for (int j = 0; j < cols; j++) { - if (!compareCellValue(expectedMatrix[i][j], actualMatrix[i][j], epsilon, false)) + if (!compareCellValue(expectedMatrix[i][j], actualMatrix[i][j], epsilon, false)) { + System.out.println(expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j); countErrors++; + } } } assertTrue("" + countErrors + " values are not in equal", countErrors == 0); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json new file mode 100644 index 0000000..c2200ae --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json @@ -0,0 +1,5 @@ +{ + "ids": true, "recode": [ 1, 2, 7 ], "bin": [ + { "id": 8 , "method": "equi-width", "numbins": 3 } + ,{ "id": 3, "method": "equi-width", "numbins": 4 }] + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json new file mode 100644 index 0000000..e398b7f --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json @@ -0,0 +1,2 @@ +{ + "ids": true, "dummycode": [ 2, 7, 1 ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json new file mode 100644 index 0000000..0757a03 --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json @@ -0,0 +1,10 @@ +{ + "ids": true, "impute": [ + { "id": 1, "method": "global_mode" } + ,{ "id": 2, "method": "constant", "value": "south" } + ,{ "id": 4, "method": "constant", "value": "2" } + ,{ "id": 5, "method": "constant", "value": "1" } + ,{ "id": 6, "method": "constant", "value": "1" } + ,{ "id": 7, "method": "global_mode" } + ,{ "id": 9, "method": "global_mean" } +], "recode": [ 2, 7 ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json new file mode 100644 index 0000000..1611f07 --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json @@ -0,0 +1,2 @@ +{ + "ids": true, "omit": [ 1,2,4,5,6,7,8,9 ], "recode": [ 2, 7 ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java index 2bd12b2..dbf9302 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java @@ -30,6 +30,7 @@ import org.junit.runners.Suite; ScalingTest.class, TransformAndApplyTest.class, TransformEncodeDecodeTest.class, + TransformFrameTest.class, TransformReadMetaTest.class, TransformTest.class, })
