Repository: incubator-systemml
Updated Branches:
  refs/heads/master 55c8ee7d6 -> aa83a6f44


[SYSTEMML-569] Extended frame transform apply (dummy, bin, omit, impute)

This patch extends the supported frame transform apply functionality in
CP/Spark from recoding to recoding, dummy coding, value omitting, and
missing value imputation. Furthermore this also includes new (isolated)
test cases and an extension of the meta data frame read utility.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8f7e8cca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8f7e8cca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8f7e8cca

Branch: refs/heads/master
Commit: 8f7e8cca76c8f4be6af69cc4001aab5ec977e3e6
Parents: 55c8ee7
Author: Matthias Boehm <[email protected]>
Authored: Fri Jun 17 21:19:09 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jun 17 22:39:24 2016 -0700

----------------------------------------------------------------------
 .../ParameterizedBuiltinSPInstruction.java      |  84 +++++-
 .../instructions/spark/utils/SparkUtils.java    |  12 +
 .../sysml/runtime/matrix/data/FrameBlock.java   |  82 +++++-
 .../sysml/runtime/transform/BinAgent.java       | 110 ++++++--
 .../sysml/runtime/transform/DataTransform.java  |   4 +-
 .../sysml/runtime/transform/DummycodeAgent.java |  69 +++--
 .../sysml/runtime/transform/MVImputeAgent.java  |  64 ++++-
 .../sysml/runtime/transform/OmitAgent.java      |  66 ++++-
 .../sysml/runtime/transform/RecodeAgent.java    |  40 +--
 .../apache/sysml/runtime/transform/TfUtils.java |  26 +-
 .../sysml/runtime/transform/encode/Encoder.java |  23 +-
 .../transform/encode/EncoderComposite.java      |  20 +-
 .../transform/encode/EncoderFactory.java        |  81 ++++--
 .../transform/encode/EncoderPassThrough.java    |  15 +-
 .../runtime/transform/meta/TfMetaUtils.java     | 278 +++++++++++++++----
 .../runtime/transform/meta/TfOffsetMap.java     |  72 +++++
 .../sysml/runtime/util/UtilFunctions.java       |  17 ++
 .../functions/jmlc/FrameReadMetaTest.java       |  50 ++--
 .../functions/transform/TransformFrameTest.java |  60 +++-
 .../org/apache/sysml/test/utils/TestUtils.java  |   4 +-
 .../input/homes3/homes.tfspec_bin.json          |   5 +
 .../input/homes3/homes.tfspec_dummy.json        |   2 +
 .../input/homes3/homes.tfspec_impute.json       |  10 +
 .../input/homes3/homes.tfspec_omit.json         |   2 +
 .../functions/transform/ZPackageSuite.java      |   1 +
 25 files changed, 948 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index ae77c35..0d45ae1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
@@ -70,11 +71,15 @@ import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTy
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
 import org.apache.sysml.runtime.transform.DataTransform;
+import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.transform.encode.EncoderFactory;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.transform.meta.TfOffsetMap;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+
 public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction 
 {      
        protected HashMap<String,String> params;
@@ -419,18 +424,26 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        FrameBlock meta = 
sec.getFrameInput(params.get("meta"));                
                        MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(params.get("target"));
                        MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
-                       mcOut.setDimension(mcIn.getRows(), mcIn.getCols()); 
//TODO encoder awareness
                        
+                       //compute omit offset map for block shifts
+                       TfOffsetMap omap = null;
+                       if( TfMetaUtils.containsOmitSpec(params.get("spec")) ) {
+                               omap = new 
TfOffsetMap(SparkUtils.toIndexedLong(in.mapToPair(
+                                       new 
RDDTransformApplyOffsetFunction(params.get("spec"))).collect()));
+                       }
+                               
                        //create encoder broadcast (avoiding replication per 
task) 
                        Encoder encoder = 
EncoderFactory.createEncoder(params.get("spec"), 
                                        fo.getSchema(), 
(int)fo.getNumColumns(), meta);
+                       
mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), 
encoder.getNumCols()); 
                        Broadcast<Encoder> bmeta = 
sec.getSparkContext().broadcast(encoder);
-               
+                       Broadcast<TfOffsetMap> bomap = (omap!=null) ? 
sec.getSparkContext().broadcast(omap) : null;
+                       
                        //execute transform apply
                        JavaPairRDD<Long,FrameBlock> tmp = in
-                                       .mapValues(new 
RDDTransformApplyFunction(bmeta));
+                                       .mapToPair(new 
RDDTransformApplyFunction(bmeta, bomap));
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
FrameRDDConverterUtils
-                                       .binaryBlockToMatrixBlock(tmp, mcIn, 
mcOut);
+                                       .binaryBlockToMatrixBlock(tmp, mcOut, 
mcOut);
                        
                        //set output and maintain lineage/output characteristics
                        sec.setRDDHandleForVariable(output.getName(), out);
@@ -685,26 +698,79 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
        /**
         * 
         */
-       public static class RDDTransformApplyFunction implements 
Function<FrameBlock,FrameBlock> 
+       public static class RDDTransformApplyFunction implements 
PairFunction<Tuple2<Long,FrameBlock>,Long,FrameBlock> 
        {
                private static final long serialVersionUID = 
5759813006068230916L;
                
                private Broadcast<Encoder> _bencoder = null;
+               private Broadcast<TfOffsetMap> _omap = null;
                
-               public RDDTransformApplyFunction(Broadcast<Encoder> bencoder) {
+               public RDDTransformApplyFunction(Broadcast<Encoder> bencoder, 
Broadcast<TfOffsetMap> omap) {
                        _bencoder = bencoder;
+                       _omap = omap;
                }
 
                @Override
-               public FrameBlock call(FrameBlock in) 
+               public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> 
in) 
                        throws Exception 
                {
+                       long key = in._1();
+                       FrameBlock blk = in._2();
+                       
                        //execute block transform apply
                        Encoder encoder = _bencoder.getValue();
-                       MatrixBlock tmp = encoder.apply(in, new 
MatrixBlock(in.getNumRows(), in.getNumColumns(), false));
+                       MatrixBlock tmp = encoder.apply(blk, new 
MatrixBlock(blk.getNumRows(), blk.getNumColumns(), false));
+                       
+                       //remap keys
+                       if( _omap != null ) {
+                               key = _omap.getValue().getOffset(key);
+                       }
                        
                        //convert to frameblock to reuse frame-matrix reblock
-                       return DataConverter.convertToFrameBlock(tmp);
+                       return new Tuple2<Long, FrameBlock>(key, 
+                                       DataConverter.convertToFrameBlock(tmp));
+               }
+       }
+       
+       /**
+        * 
+        */
+       public static class RDDTransformApplyOffsetFunction implements 
PairFunction<Tuple2<Long,FrameBlock>,Long,Long> 
+       {
+               private static final long serialVersionUID = 
3450977356721057440L;
+               
+               private int[] _omitColList = null;
+               
+               public RDDTransformApplyOffsetFunction(String spec) {
+                       try {
+                               _omitColList = 
TfMetaUtils.parseJsonIDList(spec, TfUtils.TXMETHOD_OMIT);
+                       } 
+                       catch (DMLRuntimeException e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public Tuple2<Long,Long> call(Tuple2<Long, FrameBlock> in) 
+                       throws Exception 
+               {
+                       long key = in._1();
+                       long rmRows = 0;
+                       
+                       FrameBlock blk = in._2();
+                       
+                       for( int i=0; i<blk.getNumRows(); i++ ) {
+                               boolean valid = true;
+                               for( int j=0; j<_omitColList.length; j++ ) {
+                                       int colID = _omitColList[j];
+                                       Object val = blk.get(i, colID-1);
+                                       valid &= !(val==null || 
(blk.getSchema().get(colID-1)==
+                                                       ValueType.STRING &&  
val.toString().isEmpty())); 
+                               }
+                               rmRows += valid ? 0 : 1;
+                       }
+                       
+                       return new Tuple2<Long, Long>(key, rmRows);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 34db095..98975da 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -117,6 +117,18 @@ public class SparkUtils
                return ret;
        }
        
+       /**
+        * 
+        * @param in
+        * @return
+        */
+       public static ArrayList<Pair<Long,Long>> toIndexedLong( 
List<Tuple2<Long, Long>> in ) {
+               ArrayList<Pair<Long, Long>> ret = new ArrayList<Pair<Long, 
Long>>();
+               for( Tuple2<Long, Long> e : in )
+                       ret.add(new Pair<Long,Long>(e._1(), e._2()));
+               return ret;
+       }
+       
        
        /**
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 4124c03..0065fe8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -62,6 +62,8 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        /** The column names of the data frame as an ordered list of strings */
        private List<String> _colnames = null;
        
+       private List<ColumnMetadata> _colmeta = null;
+       
        /** The data frame data as an ordered list of columns */
        private List<Array> _coldata = null;
        
@@ -72,6 +74,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                _numRows = 0;
                _schema = new ArrayList<ValueType>();
                _colnames = new ArrayList<String>();
+               _colmeta = new ArrayList<ColumnMetadata>();
                _coldata = new ArrayList<Array>();
                if( REUSE_RECODE_MAPS )
                        _rcdMapCache = new HashMap<Integer, 
SoftReference<HashMap<String,Long>>>();
@@ -104,6 +107,9 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                _numRows = 0; //maintained on append
                _schema = new ArrayList<ValueType>(schema);
                _colnames = new ArrayList<String>(names);
+               _colmeta = new ArrayList<ColumnMetadata>();
+               for( int j=0; j<_schema.size(); j++ )
+                       _colmeta.add(new ColumnMetadata(0));
                _coldata = new ArrayList<Array>();
                for( int i=0; i<data.length; i++ )
                        appendRow(data[i]);
@@ -157,6 +163,40 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        }
        
        /**
+        * 
+        * @return
+        */
+       public List<ColumnMetadata> getColumnMetadata() {
+               return _colmeta;
+       }
+       
+       /**
+        * 
+        * @param c
+        * @return
+        */
+       public ColumnMetadata getColumnMetadata(int c) {
+               return _colmeta.get(c);
+       }
+       
+       /**
+        * 
+        * @param colmeta
+        */
+       public void setColumnMetadata(List<ColumnMetadata> colmeta) {
+               _colmeta = colmeta;
+       }
+       
+       /**
+        * 
+        * @param c
+        * @param colmeta
+        */
+       public void setColumnMetadata(int c, ColumnMetadata colmeta) {
+               _colmeta.set(c, colmeta);
+       }
+       
+       /**
         * Creates a mapping from column names to column IDs, i.e., 
         * 1-based column indexes
         * 
@@ -177,6 +217,9 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                //early abort if already allocated
                if( _schema.size() == _coldata.size() ) 
                        return;         
+               //allocate column meta data
+               for( int j=0; j<_schema.size(); j++ )
+                       _colmeta.add(new ColumnMetadata(0));
                //allocate columns if necessary
                for( int j=0; j<_schema.size(); j++ ) {
                        if( j >= _coldata.size() )
@@ -420,6 +463,9 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                for( int j=0; j<getNumColumns(); j++ ) {
                        out.writeByte(_schema.get(j).ordinal());
                        out.writeUTF(_colnames.get(j));
+                       out.writeLong(_colmeta.get(j).getNumDistinct());
+                       out.writeUTF( (_colmeta.get(j).getMvValue()!=null) ? 
+                                       _colmeta.get(j).getMvValue() : "" );
                        _coldata.get(j).write(out);
                }
        }
@@ -429,12 +475,15 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                //read head (rows, cols)
                _numRows = in.readInt();
                int numCols = in.readInt();
-               //read columns (value type, data)
+               //read columns (value type, meta, data)
                _schema.clear();
+               _colmeta.clear();
                _coldata.clear();
                for( int j=0; j<numCols; j++ ) {
                        ValueType vt = ValueType.values()[in.readByte()];
                        String name = in.readUTF();
+                       long ndistinct = in.readLong();
+                       String mvvalue = in.readUTF();
                        Array arr = null;
                        switch( vt ) {
                                case STRING:  arr = new StringArray(new 
String[_numRows]); break;
@@ -446,6 +495,8 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        arr.readFields(in);
                        _schema.add(vt);
                        _colnames.add(name);
+                       _colmeta.add(new ColumnMetadata(ndistinct, 
+                                       mvvalue.isEmpty() ? null : mvvalue));
                        _coldata.add(arr);
                }
        }
@@ -1067,4 +1118,33 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        return new 
DoubleArray(Arrays.copyOfRange(_data,rl,ru+1));
                }
        }
+       
+       /**
+        * 
+        */
+       public static class ColumnMetadata {
+               private long _ndistinct = -1;
+               private String _mvValue = null;
+               
+               public ColumnMetadata(long ndistinct, String mvval) {
+                       _ndistinct = ndistinct;
+                       _mvValue = mvval;
+               }
+               public ColumnMetadata(long ndistinct) {
+                       _ndistinct = ndistinct;
+               }
+
+               public long getNumDistinct() {
+                       return _ndistinct;
+               }               
+               public void setNumDistinct(long ndistinct) {
+                       _ndistinct = ndistinct;
+               }
+               public String getMvValue() {
+                       return _mvValue;
+               }
+               public void setMvValue(String mvVal) {
+                       _mvValue = mvVal;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index 7e6c9c8..3ed9a1d 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -28,7 +28,9 @@ import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -37,11 +39,13 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class BinAgent extends Encoder 
@@ -56,33 +60,57 @@ public class BinAgent extends Encoder
        private double[] _min=null, _max=null;  // min and max among 
non-missing values
        private double[] _binWidths = null;             // width of a bin for 
each attribute
        
-       public BinAgent() {
-               super( null );
+       //frame transform-apply attributes
+       private double[][] _binMins = null;
+       private double[][] _binMaxs = null;
+       
+       public BinAgent(int clen) {
+               super( null, clen );
+       }
+       
+       public BinAgent(JSONObject parsedSpec, int clen) 
+               throws JSONException, IOException 
+       {
+               this(parsedSpec, clen, false);
        }
        
-       public BinAgent(JSONObject parsedSpec) 
-               throws JSONException 
+       /**
+        * 
+        * @param parsedSpec
+        * @param clen
+        * @throws JSONException
+        * @throws IOException 
+        */
+       public BinAgent(JSONObject parsedSpec, int clen, boolean colsOnly) 
+               throws JSONException, IOException 
        {
-               super( null );          
+               super( null, clen );            
                if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
                        return;
                
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_BIN);             
-               JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
-               JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS);
+               if( colsOnly ) {
+                       List<Integer> collist = 
TfMetaUtils.parseBinningColIDs(parsedSpec);
+                       initColList(ArrayUtils.toPrimitive(collist.toArray(new 
Integer[0])));
+               }
+               else 
+               {
+                       JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_BIN);             
+                       JSONArray attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
+                       JSONArray nbins = (JSONArray) 
obj.get(TfUtils.JSON_NBINS);
+                       initColList(attrs);
                        
-               initColList(attrs);
-               _numBins = new int[attrs.size()];
-               for(int i=0; i < _numBins.length; i++)
-                       _numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
-               
-               // initialize internal transformation metadata
-               _min = new double[_colList.length];
-               Arrays.fill(_min, Double.MAX_VALUE);
-               _max = new double[_colList.length];
-               Arrays.fill(_max, -Double.MAX_VALUE);
-               
-               _binWidths = new double[_colList.length];
+                       _numBins = new int[attrs.size()];
+                       for(int i=0; i < _numBins.length; i++)
+                               _numBins[i] = 
UtilFunctions.toInt(nbins.get(i)); 
+                       
+                       // initialize internal transformation metadata
+                       _min = new double[_colList.length];
+                       Arrays.fill(_min, Double.MAX_VALUE);
+                       _max = new double[_colList.length];
+                       Arrays.fill(_max, -Double.MAX_VALUE);
+                       
+                       _binWidths = new double[_colList.length];
+               }
        }
 
        public int[] getNumBins() { return _numBins; }
@@ -173,7 +201,7 @@ public class BinAgent extends Encoder
 
        private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
        {
-               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.BIN_FILE_SUFFIX);
+               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.TXMTD_BIN_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + 
max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n");
                br.close();
@@ -263,7 +291,7 @@ public class BinAgent extends Encoder
                        for(int i=0; i<_colList.length;i++) {
                                int colID = _colList[i];
                                
-                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX);
+                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
                                TfUtils.checkValidInputFile(fs, path, true); 
                                        
                                BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
@@ -320,7 +348,17 @@ public class BinAgent extends Encoder
 
        @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
-               return null;
+               for(int j=0; j<_colList.length; j++) {
+                       int colID = _colList[j];
+                       for( int i=0; i<in.getNumRows(); i++ ) {
+                               double inVal = UtilFunctions.objectToDouble(
+                                               in.getSchema().get(colID-1), 
in.get(i, colID-1));
+                               int ix = Arrays.binarySearch(_binMaxs[j], 
inVal);
+                               int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 
1;               
+                               out.quickSetValue(i, colID-1, binID);
+                       }       
+               }
+               return out;
        }
        
        @Override
@@ -331,20 +369,18 @@ public class BinAgent extends Encoder
 
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               // TODO Auto-generated method stub
-               return null;
+               build(in);
+               return apply(in, out);
        }
 
        @Override
        public void build(String[] in) {
                // TODO Auto-generated method stub
-               
        }
 
        @Override
        public void build(FrameBlock in) {
                // TODO Auto-generated method stub
-               
        }
 
        @Override
@@ -352,4 +388,24 @@ public class BinAgent extends Encoder
                // TODO Auto-generated method stub
                return null;
        }
+       
+       /**
+        * 
+        * @param meta
+        */
+       public void initBins(FrameBlock meta) {
+               _binMins = new double[_colList.length][];
+               _binMaxs = new double[_colList.length][];
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       int nbins = 
(int)meta.getColumnMetadata().get(colID-1).getNumDistinct();
+                       _binMins[j] = new double[nbins];
+                       _binMaxs[j] = new double[nbins];
+                       for( int i=0; i<nbins; i++ ) {
+                               String[] tmp = meta.get(i, 
colID-1).toString().split(Lop.DATATYPE_PREFIX);
+                               _binMins[j][i] = Double.parseDouble(tmp[0]);
+                               _binMaxs[j][i] = Double.parseDouble(tmp[1]);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java 
b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
index 9767e50..c17b662 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
@@ -678,7 +678,7 @@ public class DataTransform
                {
                        int id = UtilFunctions.toInt(o);
                        
-                       Path binpath = new Path( tfMtdPath + "/Bin/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.BIN_FILE_SUFFIX);
+                       Path binpath = new Path( tfMtdPath + "/Bin/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
                        Path rcdpath = new Path( tfMtdPath + "/Recode/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX);
                        
                        if ( TfUtils.checkValidInputFile(fs, binpath, false ) )
@@ -698,7 +698,7 @@ public class DataTransform
                        else
                                throw new DMLRuntimeException("Relevant 
transformation metadata for column (id=" + id + ", name=" + columnNames[id-1] + 
") is not found.");
                }
-               //System.out.println("Number of columns in transformed data: " 
+ ret);
+               
                return ret;
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index a25444d..bcb06df 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
@@ -51,8 +51,6 @@ public class DummycodeAgent extends Encoder
 {              
        private static final long serialVersionUID = 5832130477659116489L;
 
-       private long numCols = 0;
-       
        private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
        private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
        private int[] _binList = null;
@@ -62,18 +60,22 @@ public class DummycodeAgent extends Encoder
        private int[] _dcdColumnMap = null;                     // to help in 
translating between original and dummycoded column IDs
        private long _dummycodedLength = 0;                     // #of columns 
after dummycoded
        
-       public DummycodeAgent(int[] list) {
-               super(list);
+       public DummycodeAgent(int[] list, int clen) {
+               super(list, clen);
        }
        
-       public DummycodeAgent(JSONObject parsedSpec, long ncol) throws 
JSONException {
-               super(null);
-               numCols = ncol;
+       public DummycodeAgent(JSONObject parsedSpec, int clen) throws 
JSONException {
+               super(null, clen);
                
-               if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) )
-                       return;         
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_DUMMYCODE);
-               initColList( (JSONArray)obj.get(TfUtils.JSON_ATTRS) );  
+               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
+                       int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
TfUtils.TXMETHOD_DUMMYCODE);
+                       initColList(collist);
+               }
+       }
+       
+       @Override
+       public int getNumCols() {
+               return (int)_dummycodedLength;
        }
        
        /**
@@ -335,7 +337,7 @@ public class DummycodeAgent extends Encoder
        @Override
        public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
                if ( !isApplicable() ) {
-                       _dummycodedLength = numCols;
+                       _dummycodedLength = _clen;
                        return;
                }
                
@@ -343,7 +345,7 @@ public class DummycodeAgent extends Encoder
                Arrays.sort(_colList);  
                _domainSizes = new int[_colList.length];
 
-               _dummycodedLength = numCols;
+               _dummycodedLength = _clen;
                
                //HashMap<String, String> map = null;
                for(int i=0; i<_colList.length; i++) {
@@ -418,30 +420,63 @@ public class DummycodeAgent extends Encoder
        }
        
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
-               return null;
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
+       {
+               MatrixBlock ret = new MatrixBlock(out.getNumRows(), 
(int)_dummycodedLength, false);
+               
+               for( int i=0; i<out.getNumRows(); i++ ) {
+                       for(int colID=1, idx=0, ncolID=1; colID <= 
out.getNumColumns(); colID++) {
+                               double val = out.quickGetValue(i, colID-1);
+                               if(idx < _colList.length && 
colID==_colList[idx]) {
+                                       ret.quickSetValue(i, 
ncolID-1+(int)val-1, 1);
+                                       ncolID += _domainSizes[idx];
+                                       idx++;
+                               }
+                               else {
+                                       double ptval = 
UtilFunctions.objectToDouble(in.getSchema().get(colID-1), in.get(i, colID-1));
+                                       ret.quickSetValue(i, ncolID-1, ptval);
+                                       ncolID++;
+                               }
+                       }
+               }
+               
+               return ret;
        }
 
        @Override
        public double[] encode(String[] in, double[] out) {
+               //TODO
                return null;
        }
 
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               return null;
+               return apply(in, out);
        }
 
        @Override
        public void build(String[] in) {
+               //do nothing
        }
 
        @Override
        public void build(FrameBlock in) {
+               //do nothing
        }
 
        @Override
        public FrameBlock getMetaData(FrameBlock out) {
                return null;
        }
+       
+       public void initDomainSizes(FrameBlock meta) {
+               //initialize domain sizes and output num columns
+               _domainSizes = new int[_colList.length];
+               _dummycodedLength = _clen;
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       _domainSizes[j] = 
(int)meta.getColumnMetadata().get(colID-1).getNumDistinct();
+                       _dummycodedLength +=  _domainSizes[j];
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index da3887d..1aeb465 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,6 +51,7 @@ import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
 import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class MVImputeAgent extends Encoder 
@@ -102,10 +104,19 @@ public class MVImputeAgent extends Encoder
        public KahanObject[] getMeans_scnomv()   { return _scnomvMeanList; }
        public CM_COV_Object[] getVars_scnomv()  { return _scnomvVarList; }
        
-       public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings)
+       public MVImputeAgent(JSONObject parsedSpec, int clen) 
+               throws JSONException
+       {
+               super(null, clen);
+               int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, 
TfUtils.TXMETHOD_IMPUTE);
+               initColList(collist);
+       
+       }
+                       
+       public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings, int 
clen)
                throws JSONException 
        {
-               super(null);    
+               super(null, clen);      
                boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE);
                boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE);  
        
                _NAstrings = NAstrings;
@@ -518,7 +529,7 @@ public class MVImputeAgent extends Encoder
        
        private void writeTfMtd(int colID, String mean, String tfMtdDir, 
FileSystem fs, TfUtils agents) throws IOException 
        {
-               Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + 
TfUtils.MV_FILE_SUFFIX);
+               Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + 
TfUtils.TXMTD_MV_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + mean + "\n");
                br.close();
@@ -534,7 +545,7 @@ public class MVImputeAgent extends Encoder
        
        private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
        {
-               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.BIN_FILE_SUFFIX);
+               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.TXMTD_BIN_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + 
max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n");
                br.close();
@@ -799,7 +810,7 @@ public class MVImputeAgent extends Encoder
 
        private String readReplacement(int colID, FileSystem fs, Path  
txMtdDir, TfUtils agents) throws IOException
        {
-               Path path = new Path( txMtdDir + "/Impute/" + 
agents.getName(colID) + TfUtils.MV_FILE_SUFFIX);
+               Path path = new Path( txMtdDir + "/Impute/" + 
agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX);
                TfUtils.checkValidInputFile(fs, path, true); 
                
                BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
@@ -915,11 +926,6 @@ public class MVImputeAgent extends Encoder
                return words;
        }
        
-       @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
-               return null;
-       }
-       
        public MVMethod getMethod(int colID) {
                int idx = isApplicable(colID);          
                if(idx == -1)
@@ -944,6 +950,20 @@ public class MVImputeAgent extends Encoder
                int idx = isApplicable(colID);          
                return (idx == -1) ? null : _replacementList[idx];
        }
+       
+
+       @Override
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+               for(int i=0; i<in.getNumRows(); i++) {
+                       for(int j=0; j<_colList.length; j++) {
+                               int colID = _colList[j];
+                               if( Double.isNaN(out.quickGetValue(i, colID-1)) 
)
+                                       out.quickSetValue(i, colID-1, 
Double.parseDouble(_replacementList[j]));
+                       }
+               }
+               return out;
+       }
+       
        @Override
        public double[] encode(String[] in, double[] out) {
                // TODO Auto-generated method stub
@@ -969,4 +989,28 @@ public class MVImputeAgent extends Encoder
                // TODO Auto-generated method stub
                return null;
        }
+       
+       /**
+        * 
+        * @param meta
+        * @param rcList
+        */
+       public void initReplacementList(FrameBlock meta, List<Integer> rcList) {
+               //init replacement lists, replace recoded values to
+               //apply mv imputation potentially after recoding
+               _replacementList = new String[_colList.length];
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j];
+                       String mvVal = 
UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); 
+                       if( rcList.contains(colID) ) {
+                               Long mvVal2 = 
meta.getRecodeMap(colID-1).get(mvVal);
+                               if( mvVal2 == null) 
+                                       throw new RuntimeException("Missing 
recode value for impute value '"+mvVal+"'.");
+                               _replacementList[j] = mvVal2.toString();
+                       }
+                       else {
+                               _replacementList[j] = mvVal;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
index b83cbaa..539ff30 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
@@ -27,34 +27,40 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class OmitAgent extends Encoder 
 {      
        private static final long serialVersionUID = 1978852120416654195L;
 
-       public OmitAgent() { 
-               super(null);
+       private int _rmRows = 0;
+       
+       public OmitAgent(int clen) { 
+               super(null, clen);
        }
        
-       public OmitAgent(int[] list) {
-               super(list);
+       public OmitAgent(int[] list, int clen) {
+               super(list, clen);
        }
        
-       public OmitAgent(JSONObject parsedSpec) 
+       public OmitAgent(JSONObject parsedSpec, int clen) 
                throws JSONException 
        {
-               super(null);
+               super(null, clen);
                if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT))
                        return;
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_OMIT);
-               initColList((JSONArray) obj.get(TfUtils.JSON_ATTRS));
+               int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
TfUtils.TXMETHOD_OMIT);
+               initColList(collist);
+       }
+       
+       public int getNumRemovedRows() {
+               return _rmRows;
        }
        
        public boolean omit(String[] words, TfUtils agents) 
@@ -93,8 +99,37 @@ public class OmitAgent extends Encoder
        }
        
        @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
-               return null;
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
+       {
+               //determine output size
+               int numRows = 0;
+               for(int i=0; i<out.getNumRows(); i++) {
+                       boolean valid = true;
+                       for(int j=0; j<_colList.length; j++)
+                               valid &= !Double.isNaN(out.quickGetValue(i, 
_colList[j]-1));
+                       numRows += valid ? 1 : 0;
+               }
+               
+               //copy over valid rows into the output
+               MatrixBlock ret = new MatrixBlock(numRows, out.getNumColumns(), 
false);
+               int pos = 0;
+               for(int i=0; i<in.getNumRows(); i++) {
+                       //determine if valid row or omit
+                       boolean valid = true;
+                       for(int j=0; j<_colList.length; j++)
+                               valid &= !Double.isNaN(out.quickGetValue(i, 
_colList[j]-1));
+                       //copy row if necessary
+                       if( valid ) {
+                               for(int j=0; j<out.getNumColumns(); j++)
+                                       ret.quickSetValue(pos, j, 
out.quickGetValue(i, j));
+                               pos++;
+                       }
+               }
+       
+               //keep info an remove rows
+               _rmRows = out.getNumRows() - pos;
+               
+               return ret; 
        }
 
        @Override
@@ -104,22 +139,23 @@ public class OmitAgent extends Encoder
 
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               return null;
+               return apply(in, out);
        }
 
        @Override
        public void build(String[] in) {
-               
+               //do nothing
        }
 
        @Override
        public void build(FrameBlock in) {      
-       
+               //do nothing
        }
 
        @Override
        public FrameBlock getMetaData(FrameBlock out) {
-               return null;
+               //do nothing
+               return out;
        }
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 6170412..a44ad5a 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -43,8 +43,8 @@ import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.decode.DecoderRecode;
 import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
@@ -59,37 +59,23 @@ public class RecodeAgent extends Encoder
        private HashMap<Integer, HashMap<String, Long>> _rcdMaps  = new 
HashMap<Integer, HashMap<String, Long>>();
        private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
        
-       public RecodeAgent(JSONObject parsedSpec)
+       public RecodeAgent(JSONObject parsedSpec, int clen)
                throws JSONException 
        {
-               super(null);
-               int rcdCount = 0;       
-               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE)) 
-               {
-                       //TODO consolidate external and internal json spec 
definitions
-                       JSONArray attrs = null;
-                       if( parsedSpec.get(TfUtils.TXMETHOD_RECODE) instanceof 
JSONObject ) {
-                               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_RECODE);
-                               attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
-                       }
-                       else
-                               attrs = 
(JSONArray)parsedSpec.get(TfUtils.TXMETHOD_RECODE);                     
-                       rcdCount = initColList(attrs);
+               super(null, clen);
+               int rcdCount = 0;
+               
+               if( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) {
+                       int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
TfUtils.TXMETHOD_RECODE);
+                       rcdCount = initColList(collist);
                }
                
-               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) 
-               {
-                       JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_MVRCD);
-                       JSONArray attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
-                       
-                       _mvrcdList = new int[attrs.size()];
-                       for(int i=0; i < _mvrcdList.length; i++) 
-                               _mvrcdList[i] = 
UtilFunctions.toInt(attrs.get(i));
-                       rcdCount += attrs.size();
+               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) {
+                       _mvrcdList = TfMetaUtils.parseJsonIDList(parsedSpec, 
TfUtils.TXMETHOD_MVRCD);
+                       rcdCount += _mvrcdList.length;
                }
                
-               if ( rcdCount > 0 )
-               {
+               if ( rcdCount > 0 ) {
                        _fullrcdList = new int[rcdCount];
                        int idx = -1;
                        if(_colList != null)
@@ -305,7 +291,7 @@ public class RecodeAgent extends Encoder
                
                if (isModeImputed) 
                {
-                       pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) 
+ TfUtils.MV_FILE_SUFFIX);
+                       pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) 
+ TfUtils.TXMTD_MV_FILE_SUFFIX);
                        br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                        br.write(colID + "," + UtilFunctions.quote(mode));
                        br.close();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 16ccf6e..24bde22 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -83,20 +83,20 @@ public class TfUtils implements Serializable{
        public static final String TXMETHOD_MVRCD     = "mvrcd";
                
        //transform meta data constants
-       public static final String TXMTD_SEP         = ",";
-       public static final String TXMTD_COLTYPES        = "coltypes.csv";      
+       public static final String TXMTD_SEP         = ",";
+       public static final String TXMTD_COLTYPES    = "coltypes.csv";  
        public static final String TXMTD_COLNAMES    = "column.names";
        public static final String TXMTD_DC_COLNAMES = 
"dummycoded.column.names";       
-       public static final String TXMTD_RCD_MAP_SUFFIX          = ".map";
+       public static final String TXMTD_RCD_MAP_SUFFIX      = ".map";
        public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct";
+       public static final String TXMTD_BIN_FILE_SUFFIX     = ".bin";
+       public static final String TXMTD_MV_FILE_SUFFIX      = ".impute";
        
        public static final String JSON_ATTRS   = "attributes"; 
        public static final String JSON_MTHD    = "methods"; 
        public static final String JSON_CONSTS = "constants"; 
        public static final String JSON_NBINS   = "numbins";            
-       protected static final String MV_FILE_SUFFIX            = ".impute";
        protected static final String MODE_FILE_SUFFIX          = ".mode";
-       protected static final String BIN_FILE_SUFFIX           = ".bin";
        protected static final String SCALE_FILE_SUFFIX         = ".scale";
        protected static final String DCD_FILE_NAME             = 
"dummyCodeMaps.csv";  
        protected static final String DCD_NAME_SEP      = "_";
@@ -119,7 +119,7 @@ public class TfUtils implements Serializable{
        private String _delimString = null;
        private String[] _NAstrings = null;
        private String[] _outputColumnNames = null;
-       private long _numInputCols = -1;
+       private int _numInputCols = -1;
        
        private String _tfMtdDir = null;
        private String _spec = null;
@@ -135,7 +135,7 @@ public class TfUtils implements Serializable{
                }               
                _NAstrings = TfUtils.parseNAStrings(job);
                _spec = job.get(MRJobConfiguration.TF_SPEC);
-               _oa = new OmitAgent(new JSONObject(_spec));
+               _oa = new OmitAgent(new JSONObject(_spec), -1);
        }
        
        // called from GenTFMtdMapper, ApplyTf (Hadoop)
@@ -176,7 +176,7 @@ public class TfUtils implements Serializable{
                throws IOException, JSONException 
        {
                //TODO recodemaps handover
-               _numInputCols = inNcol;
+               _numInputCols = (int)inNcol;
                createAgents(spec, new String[]{});
        }
        
@@ -244,10 +244,10 @@ public class TfUtils implements Serializable{
        private void createAgents(JSONObject spec, String[] naStrings) 
                throws IOException, JSONException 
        {
-               _oa = new OmitAgent(spec);
-               _mia = new MVImputeAgent(spec, naStrings);
-               _ra = new RecodeAgent(spec);
-               _ba = new BinAgent(spec);
+               _oa = new OmitAgent(spec, _numInputCols);
+               _mia = new MVImputeAgent(spec, naStrings, _numInputCols);
+               _ra = new RecodeAgent(spec, _numInputCols);
+               _ba = new BinAgent(spec, _numInputCols);
                _da = new DummycodeAgent(spec, _numInputCols);
        }
        
@@ -279,7 +279,7 @@ public class TfUtils implements Serializable{
                _delimString = delim;
                _delim = Pattern.compile(Pattern.quote(delim));
                _NAstrings = naStrings;
-               _numInputCols = numCols;
+               _numInputCols = (int)numCols;
                _offsetFile = offsetFile;
                _tmpDir = tmpPath;
                _outputPath = outputPath;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
index 98684da..ac01357 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -45,16 +45,27 @@ public abstract class Encoder implements Serializable
 {
        private static final long serialVersionUID = 2299156350718979064L;
        
+       protected int _clen = -1; 
        protected int[] _colList = null;
        
-       protected Encoder( int[] colList ) {
+       protected Encoder( int[] colList, int clen ) {
                _colList = colList;
+               _clen = clen;
        }
        
        public int[] getColList() {
                return _colList;
        }
        
+       public void setColList(int[] colList) {
+               _colList = colList;
+       }
+       
+       public int getNumCols() {
+               return _clen;
+       }
+       
+       
        /**
         * 
         * @param attrs
@@ -67,6 +78,16 @@ public abstract class Encoder implements Serializable
        }
        
        /**
+        * 
+        * @param attrs
+        * @return
+        */
+       public int initColList(int[] colList) {
+               _colList = colList;
+               return _colList.length;
+       }
+       
+       /**
         * Indicates if this encoder is applicable, i.e, if there is at 
         * least one column to encode. 
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index b3477ea..1c27350 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -47,26 +47,34 @@ public class EncoderComposite extends Encoder
        private List<Encoder> _encoders = null;
        
        protected EncoderComposite(List<Encoder> encoders) {
-               super(null);
+               super(null, -1);
                _encoders = encoders;
        }
        
        protected EncoderComposite(Encoder[] encoders) {
-               super(null);
+               super(null, -1);
                _encoders = Arrays.asList(encoders);
        }
-
+       
+       @Override
+       public int getNumCols() {
+               int clen = 0;
+               for( Encoder encoder : _encoders )
+                       clen = Math.max(clen, encoder.getNumCols());
+               return clen;
+       }
+       
        @Override
        public double[] encode(String[] in, double[] out) {
                for( Encoder encoder : _encoders )
-                       encoder.encode(in, out);
+                       out = encoder.encode(in, out);
                return out;
        }
 
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
                for( Encoder encoder : _encoders )
-                       encoder.encode(in, out);
+                       out = encoder.encode(in, out);
                return out;
        }
 
@@ -99,7 +107,7 @@ public class EncoderComposite extends Encoder
        @Override 
        public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
                for( Encoder encoder : _encoders )
-                       encoder.apply(in, out);
+                       out = encoder.apply(in, out);
                return out;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index 75fe639..c3959ca 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -20,15 +20,23 @@
 package org.apache.sysml.runtime.transform.encode;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.transform.BinAgent;
+import org.apache.sysml.runtime.transform.DummycodeAgent;
+import org.apache.sysml.runtime.transform.MVImputeAgent;
+import org.apache.sysml.runtime.transform.OmitAgent;
 import org.apache.sysml.runtime.transform.RecodeAgent;
 import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.wink.json4j.JSONObject;
 
 public class EncoderFactory 
@@ -66,43 +74,66 @@ public class EncoderFactory
         * @return
         * @throws DMLRuntimeException
         */
+       @SuppressWarnings("unchecked")
        public static Encoder createEncoder(String spec, List<ValueType> 
schema, FrameBlock meta) 
                throws DMLRuntimeException 
        {       
                Encoder encoder = null;
+               int clen = schema.size();
                
                try {
                        //parse transform specification
                        JSONObject jSpec = new JSONObject(spec);
                        List<Encoder> lencoders = new ArrayList<Encoder>();
                
-                       //create encoders 'recode' and 'pass-through'
-                       if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE))  {
-                               RecodeAgent ra = new RecodeAgent(jSpec);
+                       //prepare basic id lists (recode, dummycode, 
pass-through)
+                       //note: any dummycode column requires recode as 
preparation
+                       List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_RECODE)));
+                       List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_DUMMYCODE))); 
+                       rcIDs = new 
ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs));
+                       List<Integer> binIDs = 
TfMetaUtils.parseBinningColIDs(jSpec); 
+                       List<Integer> ptIDs = new 
ArrayList<Integer>(CollectionUtils.subtract(
+                                       
CollectionUtils.subtract(UtilFunctions.getSequenceList(1, clen, 1), rcIDs), 
binIDs)); 
+                       List<Integer> oIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_OMIT))); 
+                       List<Integer> mvIDs = Arrays.asList(ArrayUtils.toObject(
+                                       
TfMetaUtils.parseJsonObjectIDList(jSpec, TfUtils.TXMETHOD_IMPUTE))); 
+                       
+                       //create individual encoders
+                       if( !rcIDs.isEmpty() ) {
+                               RecodeAgent ra = new RecodeAgent(jSpec, clen);
+                               
ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0])));
                                if( meta != null )
                                        ra.initRecodeMaps(meta);
-                               lencoders.add(ra);
-                       
-                               //pass-through decode (non-recode columns)
-                               int[] rcCols = ra.getColList();
-                               if( schema.size() > rcCols.length ) {
-                                       int[] ptCols = new 
int[schema.size()-rcCols.length]; 
-                                       HashSet<Integer> probe = new 
HashSet<Integer>();
-                                       for( int j=0; j<rcCols.length; j++ )
-                                               probe.add(rcCols[j]-1);
-                                       for( int j=0, pos=0; j<schema.size(); 
j++ )
-                                               if( !probe.contains(j) )
-                                                       ptCols[pos++] = j;
-                                       lencoders.add(new 
EncoderPassThrough(ptCols));  
-                               }
+                               lencoders.add(ra);      
+                       }
+                       if( !ptIDs.isEmpty() ) {
+                               lencoders.add(new EncoderPassThrough(
+                                               
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));  
+                       }
+                       if( !dcIDs.isEmpty() ) {
+                               DummycodeAgent da = new DummycodeAgent(jSpec, 
schema.size());
+                               if( meta != null )
+                                       da.initDomainSizes(meta);
+                               lencoders.add(da);
+                       }
+                       if( !binIDs.isEmpty() ) {
+                               BinAgent ba = new BinAgent(jSpec, 
schema.size(), true);
+                               if( meta != null )
+                                       ba.initBins(meta);
+                               lencoders.add(ba);
+                       }
+                       if( !oIDs.isEmpty() ) {
+                               lencoders.add(new OmitAgent(jSpec, 
schema.size()));
+                       }
+                       if( !mvIDs.isEmpty() ) {
+                               MVImputeAgent ma = new MVImputeAgent(jSpec, 
schema.size());
+                               if( meta != null )
+                                       ma.initReplacementList(meta, rcIDs);
+                               lencoders.add(ma);
                        }
-                       //create full 'pass-through' encoder if necessary
-                       else {
-                               int[] ptCols = new int[schema.size()];
-                               for( int j=0; j<ptCols.length; j++ )
-                                       ptCols[j] = j;
-                               lencoders.add(new EncoderPassThrough(ptCols));
-                       }       
                        
                        //create composite decoder of all created decoders
                        encoder = new EncoderComposite(lencoders);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
index e5a0c9f..445d019 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
@@ -44,15 +44,15 @@ public class EncoderPassThrough extends Encoder
 {
        private static final long serialVersionUID = -8473768154646831882L;
        
-       protected EncoderPassThrough(int[] ptCols) {
-               super(ptCols); //0-based indexes
+       protected EncoderPassThrough(int[] ptCols, int clen) {
+               super(ptCols, clen); //1-based 
        }
 
        @Override
        public double[] encode(String[] in, double[] out) {
                for( int j=0; j<_colList.length; j++ ) {
-                       String tmp = in[_colList[j]];
-                       out[_colList[j]] = (tmp==null) ? 0 : 
+                       String tmp = in[_colList[j]-1];
+                       out[_colList[j]-1] = (tmp==null) ? 0 : 
                                Double.parseDouble(tmp);
                }
                
@@ -87,12 +87,13 @@ public class EncoderPassThrough extends Encoder
        @Override 
        public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
                for( int j=0; j<_colList.length; j++ ) {
-                       int col = _colList[j];
+                       int col = _colList[j]-1;
                        ValueType vt = in.getSchema().get(col);
                        for( int i=0; i<in.getNumRows(); i++ ) {
                                Object val = in.get(i, col);
-                               out.quickSetValue(i, col,
-                                       UtilFunctions.objectToDouble(vt, val));
+                               out.quickSetValue(i, col, 
(val==null||(vt==ValueType.STRING 
+                                               && val.toString().isEmpty())) ? 
Double.NaN : 
+                                               
UtilFunctions.objectToDouble(vt, val));
                        }
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
index 2536b23..b9feb5d 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
@@ -30,12 +30,14 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map.Entry;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.sysml.api.jmlc.Connection;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
@@ -44,12 +46,98 @@ import 
org.apache.sysml.runtime.transform.decode.DecoderRecode;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
 public class TfMetaUtils 
 {
-       private static final Log LOG = 
LogFactory.getLog(TfMetaUtils.class.getName());
-
+       /**
+        * 
+        * @param spec
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static boolean containsOmitSpec(String spec) throws 
DMLRuntimeException {
+               return (TfMetaUtils.parseJsonIDList(spec, 
TfUtils.TXMETHOD_OMIT).length > 0);   
+       }
+       
+       /**
+        * 
+        * @param spec
+        * @param group
+        * @return
+        * @throws JSONException
+        */
+       public static int[] parseJsonIDList(String spec, String group) 
+               throws DMLRuntimeException 
+       {
+               try {
+                       JSONObject jSpec = new JSONObject(spec);
+                       return parseJsonIDList(jSpec, group);
+               }
+               catch(JSONException ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+       }
+       
+       /**
+        * TODO consolidate external and internal json spec definitions
+        *              
+        * @param parsedSpec
+        * @param group
+        * @return
+        * @throws JSONException 
+        */
+       public static int[] parseJsonIDList(JSONObject spec, String group) 
+               throws JSONException
+       {
+               int[] colList = new int[0];
+               
+               if( spec.containsKey(group) ) {
+                       //parse attribute-array or plain array of IDs
+                       JSONArray attrs = null;
+                       if( spec.get(group) instanceof JSONObject )
+                               attrs = (JSONArray) 
((JSONObject)spec.get(group)).get(TfUtils.JSON_ATTRS);
+                       else
+                               attrs = (JSONArray)spec.get(group);             
        
+                       
+                       //construct ID list array
+                       colList = new int[attrs.size()];
+                       for(int i=0; i < colList.length; i++) 
+                               colList[i] = UtilFunctions.toInt(attrs.get(i));
+               
+                       //ensure ascending order of column IDs
+                       Arrays.sort(colList);
+               }
+               
+               return colList;
+       }
+       
+       /**
+        * 
+        * @param spec
+        * @param group
+        * @return
+        * @throws JSONException
+        */
+       public static int[] parseJsonObjectIDList(JSONObject spec, String 
group) 
+               throws JSONException
+       {
+               int[] colList = new int[0];
+               
+               if( spec.containsKey(group) && spec.get(group) instanceof 
JSONArray )
+               {
+                       JSONArray colspecs = (JSONArray)spec.get(group);
+                       colList = new int[colspecs.size()];
+                       for(int j=0; j<colspecs.size(); j++) {
+                               JSONObject colspec = (JSONObject) 
colspecs.get(j);
+                               colList[j] = colspec.getInt("id");
+                       }       
+               }
+               
+               return colList;
+       }
+       
        /**
         * Reads transform meta data from an HDFS file path and converts it 
into an in-memory
         * FrameBlock object.
@@ -63,41 +151,45 @@ public class TfMetaUtils
        public static FrameBlock readTransformMetaDataFromFile(String spec, 
String metapath, String colDelim) 
                throws IOException 
        {
-               //NOTE: this implementation assumes column alignment of 
colnames and coltypes
-               
-               //read column types (for sanity check column names)
-               String coltypesStr = 
MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLTYPES);
-               List<String> coltypes = 
Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP));
-               
                //read column names
                String colnamesStr = 
MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLNAMES);
                List<String> colnames = 
Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim));
-               if( coltypes.size() != colnames.size() ) {
-                       LOG.warn("Number of columns names: "+colnames.size()+" 
(expected: "+coltypes.size()+").");
-                       LOG.warn("--Sample column names: 
"+(!colnames.isEmpty()?colnames.get(0):"null"));
-               }
                
-               //read meta data (currently only recode supported, without 
parsing spec)
+               //read meta data (currently supported: recode, dummycode, bin, 
omit, impute)
+               //note: recode/binning and impute might be applied on the same 
column
                HashMap<String,String> meta = new HashMap<String,String>();
+               HashMap<String,String> mvmeta = new HashMap<String,String>();
                int rows = 0;
                for( int j=0; j<colnames.size(); j++ ) {
                        String colName = colnames.get(j);
+                       //read recode maps for recoded or dummycoded columns
                        String name = 
metapath+File.separator+"Recode"+File.separator+colName;
                        if( 
MapReduceTool.existsFileOnHDFS(name+TfUtils.TXMTD_RCD_MAP_SUFFIX) ) {
                                meta.put(colName, 
MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_MAP_SUFFIX));
                                String ndistinct = 
MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX);
                                rows = Math.max(rows, 
Integer.parseInt(ndistinct));
                        }
-                       else if( coltypes.get(j).equals("2") ) {
-                               LOG.warn("Recode map for column '"+colName+"' 
does not exist.");
+                       //read binning map for binned columns
+                       String name2 = 
metapath+File.separator+"Bin"+File.separator+colName;
+                       if( 
MapReduceTool.existsFileOnHDFS(name2+TfUtils.TXMTD_BIN_FILE_SUFFIX) ) {
+                               String binmap = 
MapReduceTool.readStringFromHDFSFile(name2+TfUtils.TXMTD_BIN_FILE_SUFFIX);
+                               meta.put(colName, binmap);
+                               rows = Math.max(rows, 
Integer.parseInt(binmap.split(TfUtils.TXMTD_SEP)[4]));
+                       }
+                       //read impute value for mv columns
+                       String name3 = 
metapath+File.separator+"Impute"+File.separator+colName;
+                       if( 
MapReduceTool.existsFileOnHDFS(name3+TfUtils.TXMTD_MV_FILE_SUFFIX) ) {
+                               String mvmap = 
MapReduceTool.readStringFromHDFSFile(name3+TfUtils.TXMTD_MV_FILE_SUFFIX);
+                               mvmeta.put(colName, mvmap);
                        }
                }
 
                //get list of recode ids
-               List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes);
+               List<Integer> recodeIDs = parseRecodeColIDs(spec);
+               List<Integer> binIDs = parseBinningColIDs(spec);
                
                //create frame block from in-memory strings
-               return convertToTransformMetaDataFrame(rows, recodeIDs, 
colnames, meta);
+               return convertToTransformMetaDataFrame(rows, colnames, 
recodeIDs, binIDs, meta, mvmeta);
        }
 
        /**
@@ -113,25 +205,18 @@ public class TfMetaUtils
        public static FrameBlock readTransformMetaDataFromPath(String spec, 
String metapath, String colDelim) 
                throws IOException 
        {
-               //NOTE: this implementation assumes column alignment of 
colnames and coltypes
-               
-               //read column types (for sanity check column names)
-               String coltypesStr = 
IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLTYPES));
-               List<String> coltypes = 
Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP));
-               
                //read column names
                String colnamesStr = 
IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLNAMES));
                List<String> colnames = 
Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim));
-               if( coltypes.size() != colnames.size() ) {
-                       LOG.warn("Number of columns names: "+colnames.size()+" 
(expected: "+coltypes.size()+").");
-                       LOG.warn("--Sample column names: 
"+(!colnames.isEmpty()?colnames.get(0):"null"));
-               }
                
-               //read meta data (currently only recode supported, without 
parsing spec)
+               //read meta data (currently supported: recode, dummycode, bin, 
omit)
+               //note: recode/binning and impute might be applied on the same 
column
                HashMap<String,String> meta = new HashMap<String,String>();
+               HashMap<String,String> mvmeta = new HashMap<String,String>();
                int rows = 0;
                for( int j=0; j<colnames.size(); j++ ) {
                        String colName = colnames.get(j);
+                       //read recode maps for recoded or dummycoded columns
                        String name = metapath+"/"+"Recode"+"/"+colName;
                        String map = 
IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_MAP_SUFFIX));
                        if( map != null ) {
@@ -139,16 +224,27 @@ public class TfMetaUtils
                                String ndistinct = 
IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX));
                                rows = Math.max(rows, 
Integer.parseInt(ndistinct));
                        }
-                       else if( coltypes.get(j).equals("2") ) {
-                               LOG.warn("Recode map for column '"+colName+"' 
does not exist.");
+                       //read binning map for binned columns
+                       String name2 = metapath+"/"+"Bin"+"/"+colName;
+                       String map2 = 
IOUtilFunctions.toString(Connection.class.getResourceAsStream(name2+TfUtils.TXMTD_BIN_FILE_SUFFIX));
+                       if( map2 != null ) {
+                               meta.put(colName, map2);
+                               rows = Math.max(rows, 
Integer.parseInt(map2.split(TfUtils.TXMTD_SEP)[4]));
+                       }
+                       //read impute value for mv columns
+                       String name3 = 
metapath+File.separator+"Impute"+File.separator+colName;
+                       String map3 = 
IOUtilFunctions.toString(Connection.class.getResourceAsStream(name3+TfUtils.TXMTD_MV_FILE_SUFFIX));
+                       if( map3 != null ) {
+                               mvmeta.put(colName, map3);
                        }
                }
                
                //get list of recode ids
-               List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes);
+               List<Integer> recodeIDs = parseRecodeColIDs(spec);
+               List<Integer> binIDs = parseBinningColIDs(spec);
                
                //create frame block from in-memory strings
-               return convertToTransformMetaDataFrame(rows, recodeIDs, 
colnames, meta);
+               return convertToTransformMetaDataFrame(rows, colnames, 
recodeIDs, binIDs, meta, mvmeta);
        }
        
        /**
@@ -161,7 +257,8 @@ public class TfMetaUtils
         * @return
         * @throws IOException
         */
-       private static FrameBlock convertToTransformMetaDataFrame(int rows, 
List<Integer> recodeIDs, List<String> colnames, HashMap<String,String> meta) 
+       private static FrameBlock convertToTransformMetaDataFrame(int rows, 
List<String> colnames, List<Integer> rcIDs, List<Integer> binIDs, 
+                       HashMap<String,String> meta, HashMap<String,String> 
mvmeta) 
                throws IOException 
        {
                //create frame block w/ pure string schema
@@ -169,8 +266,8 @@ public class TfMetaUtils
                FrameBlock ret = new FrameBlock(schema, colnames);
                ret.ensureAllocatedColumns(rows);
                
-               //encode recode maps into frame
-               for( Integer colID : recodeIDs ) {
+               //encode recode maps (recoding/dummycoding) into frame
+               for( Integer colID : rcIDs ) {
                        String name = colnames.get(colID-1);
                        String map = meta.get(name);
                        if( map == null )
@@ -179,12 +276,39 @@ public class TfMetaUtils
                        InputStream is = new 
ByteArrayInputStream(map.getBytes("UTF-8"));
                        BufferedReader br = new BufferedReader(new 
InputStreamReader(is));
                        Pair<String,String> pair = new Pair<String,String>();
-                       String line; int rpos = 0;
+                       String line; int rpos = 0; 
                        while( (line = br.readLine()) != null ) {
                                DecoderRecode.parseRecodeMapEntry(line, pair);
                                String tmp = pair.getKey() + 
Lop.DATATYPE_PREFIX + pair.getValue();
                                ret.set(rpos++, colID-1, tmp);
                        }
+                       
ret.getColumnMetadata(colID-1).setNumDistinct((long)rpos);
+               }
+               
+               //encode bin maps (binning) into frame
+               for( Integer colID : binIDs ) {
+                       String name = colnames.get(colID-1);
+                       String map = meta.get(name);
+                       if( map == null )
+                               throw new IOException("Binning map for column 
'"+name+"' (id="+colID+") not existing.");
+                       String[] fields = map.split(TfUtils.TXMTD_SEP);
+                       double min = UtilFunctions.parseToDouble(fields[1]);
+                       double binwidth = 
UtilFunctions.parseToDouble(fields[3]);
+                       int nbins = UtilFunctions.parseToInt(fields[4]);
+                       //materialize bins to support equi-width/equi-height
+                       for( int i=0; i<nbins; i++ ) { 
+                               String lbound = String.valueOf(min+i*binwidth);
+                               String ubound = 
String.valueOf(min+(i+1)*binwidth);
+                               ret.set(i, colID-1, 
lbound+Lop.DATATYPE_PREFIX+ubound);
+                       }
+                       
ret.getColumnMetadata(colID-1).setNumDistinct((long)nbins);
+               }
+               
+               //encode impute meta data into frame
+               for( Entry<String, String> e : mvmeta.entrySet() ) {
+                       int colID = colnames.indexOf(e.getKey()) + 1;
+                       String mvVal = e.getValue().split(TfUtils.TXMTD_SEP)[1];
+                       ret.getColumnMetadata(colID-1).setMvValue(mvVal);
                }
                
                return ret;
@@ -199,33 +323,23 @@ public class TfMetaUtils
         * @return
         * @throws IOException
         */
-       private static ArrayList<Integer> parseRecodeColIDs(String spec, 
List<String> coltypes) 
+       @SuppressWarnings("unchecked")
+       private static List<Integer> parseRecodeColIDs(String spec) 
                throws IOException 
        {       
-               ArrayList<Integer> specRecodeIDs = new ArrayList<Integer>();
+               if( spec == null )
+                       throw new IOException("Missing transform 
specification.");
+               
+               List<Integer> specRecodeIDs = null;
                
                try {
-                       if( spec != null ) {
-                               //parse json transform specification for recode 
col ids
-                               JSONObject jSpec = new JSONObject(spec);
-                               if ( 
jSpec.containsKey(TfUtils.TXMETHOD_RECODE))  {
-                                       JSONArray attrs = null; //TODO simplify 
once json spec consolidated
-                                       if( jSpec.get(TfUtils.TXMETHOD_RECODE) 
instanceof JSONObject ) {
-                                               JSONObject obj = (JSONObject) 
jSpec.get(TfUtils.TXMETHOD_RECODE);
-                                               attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
-                                       }
-                                       else
-                                               attrs = 
(JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE);                          
-                                       for(int j=0; j<attrs.length(); j++) 
-                                               
specRecodeIDs.add(UtilFunctions.toInt(attrs.get(j)));
-                               }
-                       }
-                       else {
-                               //obtain recode col ids from coltypes 
-                               for( int j=0; j<coltypes.size(); j++ )
-                                       if( coltypes.get(j).equals("2") )
-                                               specRecodeIDs.add(j+1);
-                       }
+                       //parse json transform specification for recode col ids
+                       JSONObject jSpec = new JSONObject(spec);
+                       List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_RECODE)));
+                       List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_DUMMYCODE))); 
+                       specRecodeIDs = new 
ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs));
                }
                catch(Exception ex) {
                        throw new IOException(ex);
@@ -233,4 +347,46 @@ public class TfMetaUtils
                
                return specRecodeIDs;
        }
+       
+       /**
+        * 
+        * @param spec
+        * @return
+        * @throws IOException
+        */
+       public static List<Integer> parseBinningColIDs(String spec) 
+               throws IOException 
+       {
+               try {
+                       JSONObject jSpec = new JSONObject(spec);
+                       return parseBinningColIDs(jSpec);
+               }
+               catch(JSONException ex) {
+                       throw new IOException(ex);
+               }
+       }
+       
+       /**
+        * 
+        * @param jSpec
+        * @return
+        * @throws IOException
+        */
+       public static List<Integer> parseBinningColIDs(JSONObject jSpec) 
+               throws IOException 
+       {
+               try {
+                       if( jSpec.containsKey(TfUtils.TXMETHOD_BIN) && 
jSpec.get(TfUtils.TXMETHOD_BIN) instanceof JSONArray ) {
+                               return Arrays.asList(ArrayUtils.toObject(
+                                               
TfMetaUtils.parseJsonObjectIDList(jSpec, TfUtils.TXMETHOD_BIN)));       
+                       }
+                       else { //internally generates
+                               return Arrays.asList(ArrayUtils.toObject(
+                                               
TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_BIN)));     
+                       }
+               }
+               catch(JSONException ex) {
+                       throw new IOException(ex);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java 
b/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java
new file mode 100644
index 0000000..a17287f
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfOffsetMap.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.meta;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.sysml.runtime.matrix.data.Pair;
+
+/**
+ * 
+ */
+public class TfOffsetMap implements Serializable
+{
+       private static final long serialVersionUID = 8949124761287236703L;
+
+       private HashMap<Long, Long> _map = null;
+       private long _rmRows = -1;
+       
+       public TfOffsetMap( List<Pair<Long,Long>> rmRows ) {
+               //sort input list of <key, rm rows per block>
+               TreeMap<Long, Long> tmap = new TreeMap<Long, Long>();
+               for( Pair<Long, Long> pair : rmRows )
+                       tmap.put(pair.getKey(), pair.getValue());
+               
+               //compute shifted keys and build hash table
+               _map = new HashMap<Long, Long>();
+               long shift = 0;
+               for( Entry<Long, Long> e : tmap.entrySet() ) {
+                       _map.put(e.getKey(), e.getKey()-shift);
+                       shift += e.getValue();
+               }
+               _rmRows = shift;
+       }
+
+       /**
+        * 
+        * @param oldKey
+        * @return
+        */
+       public long getOffset(long oldKey) {
+               return _map.get(oldKey);
+       }       
+       
+       /**
+        * 
+        * @return
+        */
+       public long getNumRmRows() {
+               return _rmRows;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index e292b39..3711a1a 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -19,6 +19,9 @@
 
 package org.apache.sysml.runtime.util;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
@@ -441,4 +444,18 @@ public class UtilFunctions
                else 
                        return Long.parseLong(arg.substring(0,arg.length()));
        }
+       
+       /**
+        * 
+        * @param low   lower bound (inclusive)
+        * @param up    upper bound (inclusive)
+        * @param incr  increment 
+        * @return
+        */
+       public static List<Integer> getSequenceList(int low, int up, int incr) {
+               ArrayList<Integer> ret = new ArrayList<Integer>();
+               for( int i=low; i<=up; i+=incr )
+                       ret.add(i);
+               return ret;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java
index 1abe68a..2ce82f3 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java
@@ -20,16 +20,22 @@
 package org.apache.sysml.test.integration.functions.jmlc;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.sysml.api.jmlc.Connection;
 import org.apache.sysml.api.jmlc.PreparedScript;
 import org.apache.sysml.api.jmlc.ResultVariables;
 import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.test.integration.AutomatedTestBase;
@@ -101,19 +107,19 @@ public class FrameReadMetaTest extends AutomatedTestBase
                //establish connection to SystemML
                Connection conn = new Connection();
                
-               //read meta data frame
+               //read meta data frame 
                String spec = MapReduceTool.readStringFromHDFSFile(SCRIPT_DIR + 
TEST_DIR+"tfmtd_example/spec.json");
                FrameBlock M = readFrame ?
                                
DataConverter.convertToFrameBlock(conn.readStringFrame(SCRIPT_DIR + 
TEST_DIR+"tfmtd_frame_example/tfmtd_frame")) : 
-                               conn.readTransformMetaDataFromFile(useSpec ? 
spec : null, SCRIPT_DIR + TEST_DIR+"tfmtd_example/");
-               
-               //generate data based on recode maps
-               HashMap<String,Long>[] RC = getRecodeMaps(M);
-               double[][] X = generateData(rows, cols, RC);
-               String[][] F = null;
+                               conn.readTransformMetaDataFromFile(spec, 
SCRIPT_DIR + TEST_DIR+"tfmtd_example/");
                
                try
                {
+                       //generate data based on recode maps
+                       HashMap<String,Long>[] RC = getRecodeMaps(spec, M);
+                       double[][] X = generateData(rows, cols, RC);
+                       String[][] F = null;
+                       
                        //prepare input arguments
                        HashMap<String,String> args = new 
HashMap<String,String>();
                        args.put("$TRANSFORM_SPEC", spec);
@@ -139,6 +145,17 @@ public class FrameReadMetaTest extends AutomatedTestBase
                                //get output parameter
                                F = rs.getFrame("F");
                        }
+                       
+
+                       //check correct result 
+                       //for all generated data, probe recode maps and compare 
versus output
+                       for( int i=0; i<rows; i++ ) 
+                               for( int j=0; j<cols; j++ ) 
+                                       if( RC[j] != null ) {
+                                               Assert.assertEquals("Wrong 
result: "+F[i][j]+".", 
+                                                               
Double.valueOf(X[i][j]), 
+                                                               
Double.valueOf(RC[j].get(F[i][j]).toString()));
+                                       }       
                }
                catch(Exception ex) {
                        ex.printStackTrace();
@@ -149,31 +166,26 @@ public class FrameReadMetaTest extends AutomatedTestBase
                        if( conn != null )
                                conn.close();
                }
-               
-               //check correct result 
-               //for all generated data, probe recode maps and compare versus 
output
-               for( int i=0; i<rows; i++ ) 
-                       for( int j=0; j<cols; j++ ) 
-                               if( RC[j] != null ) {
-                                       Assert.assertEquals("Wrong result: 
"+F[i][j]+".", 
-                                                       
Double.valueOf(X[i][j]), 
-                                                       
Double.valueOf(RC[j].get(F[i][j]).toString()));
-                               }       
        }
 
        /**
         * 
         * @param M
         * @return
+        * @throws DMLRuntimeException 
         */
        @SuppressWarnings("unchecked")
-       private HashMap<String,Long>[] getRecodeMaps(FrameBlock M) {
+       private HashMap<String,Long>[] getRecodeMaps(String spec, FrameBlock M) 
+               throws DMLRuntimeException 
+       {
+               List<Integer> collist = Arrays.asList(ArrayUtils.toObject(
+                               TfMetaUtils.parseJsonIDList(spec, 
TfUtils.TXMETHOD_RECODE)));
                HashMap<String,Long>[] ret = new HashMap[M.getNumColumns()];
                Iterator<Object[]> iter = M.getObjectRowIterator();
                while( iter.hasNext() ) {
                        Object[] tmp = iter.next();
                        for( int j=0; j<tmp.length; j++ ) 
-                               if( tmp[j] != null ) {
+                               if( collist.contains(j+1) && tmp[j] != null ) {
                                        if( ret[j] == null )
                                                ret[j] = new 
HashMap<String,Long>();
                                        String[] parts = 
tmp[j].toString().split(Lop.DATATYPE_PREFIX);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
index e8837cc..6921b77 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
@@ -39,10 +39,14 @@ public class TransformFrameTest extends AutomatedTestBase
        private final static String TEST_DIR = "functions/transform/";
        private final static String TEST_CLASS_DIR = TEST_DIR + 
TransformFrameTest.class.getSimpleName() + "/";
        
-       private final static String DATASET     = "homes3/homes.csv";
+       //dataset and transform tasks without missing values
+       private final static String DATASET1    = "homes3/homes.csv";
        private final static String SPEC1               = 
"homes3/homes.tfspec_recode.json"; 
        private final static String SPEC2               = 
"homes3/homes.tfspec_dummy.json";
-       private final static String SPEC3               = 
"homes3/homes.tfspec_bin.json";
+       private final static String SPEC3               = 
"homes3/homes.tfspec_bin.json"; //incl recode
+       
+       //dataset and transform tasks with missing values
+       private final static String DATASET2    = "homes/homes.csv";
        private final static String SPEC4               = 
"homes3/homes.tfspec_impute.json";
        private final static String SPEC5               = 
"homes3/homes.tfspec_omit.json";
        
@@ -70,6 +74,46 @@ public class TransformFrameTest extends AutomatedTestBase
        public void testHomesRecodeSparkCSV() {
                runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.RECODE);
        }
+       
+       @Test
+       public void testHomesDummycodeSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.DUMMY);
+       }
+       
+       @Test
+       public void testHomesDummycodeSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.DUMMY);
+       }
+       
+       @Test
+       public void testHomesBinningSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.BIN);
+       }
+       
+       @Test
+       public void testHomesBinningSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.BIN);
+       }
+       
+       @Test
+       public void testHomesOmitSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.OMIT);
+       }
+       
+       @Test
+       public void testHomesOmitSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.OMIT);
+       }
+       
+       @Test
+       public void testHomesImputeSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.IMPUTE);
+       }
+       
+       @Test
+       public void testHomesImputeSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.IMPUTE);
+       }
 
        /**
         * 
@@ -89,13 +133,13 @@ public class TransformFrameTest extends AutomatedTestBase
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
                //set transform specification
-               String SPEC = null;
+               String SPEC = null; String DATASET = null;
                switch( type ) {
-                       case RECODE: SPEC = SPEC1; break;
-                       case DUMMY:  SPEC = SPEC2; break;
-                       case BIN:    SPEC = SPEC3; break;
-                       case IMPUTE: SPEC = SPEC4; break;
-                       case OMIT:   SPEC = SPEC5; break;
+                       case RECODE: SPEC = SPEC1; DATASET = DATASET1; break;
+                       case DUMMY:  SPEC = SPEC2; DATASET = DATASET1; break;
+                       case BIN:    SPEC = SPEC3; DATASET = DATASET1; break;
+                       case IMPUTE: SPEC = SPEC4; DATASET = DATASET2; break;
+                       case OMIT:   SPEC = SPEC5; DATASET = DATASET2; break;
                }
 
                if( !ofmt.equals("csv") )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/java/org/apache/sysml/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/utils/TestUtils.java 
b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
index 4f97f7f..78ebf60 100644
--- a/src/test/java/org/apache/sysml/test/utils/TestUtils.java
+++ b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
@@ -730,8 +730,10 @@ public class TestUtils
                int countErrors = 0;
                for (int i = 0; i < rows; i++) {
                        for (int j = 0; j < cols; j++) {
-                               if (!compareCellValue(expectedMatrix[i][j], 
actualMatrix[i][j], epsilon, false))
+                               if (!compareCellValue(expectedMatrix[i][j], 
actualMatrix[i][j], epsilon, false)) {
+                                       System.out.println(expectedMatrix[i][j] 
+" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
                                        countErrors++;
+                               }
                        }
                }
                assertTrue("" + countErrors + " values are not in equal", 
countErrors == 0);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json 
b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json
new file mode 100644
index 0000000..c2200ae
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_bin.json
@@ -0,0 +1,5 @@
+{
+ "ids": true, "recode": [ 1, 2, 7 ], "bin": [
+ { "id": 8  , "method": "equi-width", "numbins": 3 }
+ ,{ "id": 3, "method": "equi-width", "numbins": 4 }]
+  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json 
b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json
new file mode 100644
index 0000000..e398b7f
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_dummy.json
@@ -0,0 +1,2 @@
+{
+ "ids": true, "dummycode": [ 2, 7, 1 ] }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json 
b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json
new file mode 100644
index 0000000..0757a03
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_impute.json
@@ -0,0 +1,10 @@
+{
+ "ids": true, "impute": [
+                 { "id": 1, "method": "global_mode" }
+                ,{ "id": 2, "method": "constant", "value": "south" }
+                ,{ "id": 4, "method": "constant", "value": "2" }
+                ,{ "id": 5, "method": "constant", "value": "1" }
+                ,{ "id": 6, "method": "constant", "value": "1" }
+                ,{ "id": 7, "method": "global_mode" }
+                ,{ "id": 9, "method": "global_mean" }
+], "recode": [ 2, 7 ] }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json 
b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json
new file mode 100644
index 0000000..1611f07
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_omit.json
@@ -0,0 +1,2 @@
+{
+ "ids": true, "omit": [ 1,2,4,5,6,7,8,9 ], "recode": [ 2, 7 ] }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8f7e8cca/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
index 2bd12b2..dbf9302 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
        ScalingTest.class,
        TransformAndApplyTest.class,
        TransformEncodeDecodeTest.class,
+       TransformFrameTest.class,
        TransformReadMetaTest.class,
        TransformTest.class,
 })

Reply via email to