[SYSTEMML-613] Refactoring transform encoders (api, simplify, cleanup)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/d9e0748b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/d9e0748b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/d9e0748b

Branch: refs/heads/master
Commit: d9e0748b895f6ef7d8f22542c0630036e12b596f
Parents: c8c8d81
Author: Matthias Boehm <[email protected]>
Authored: Mon Apr 4 20:54:51 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Apr 5 00:18:17 2016 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/api/jmlc/Connection.java   |  12 +-
 .../runtime/transform/ApplyTfCSVSPARK.java      |   1 -
 .../sysml/runtime/transform/BinAgent.java       | 147 ++++++--------
 .../sysml/runtime/transform/DataTransform.java  |  83 ++++----
 .../sysml/runtime/transform/DistinctValue.java  |  31 ++-
 .../sysml/runtime/transform/DummycodeAgent.java | 138 +++++---------
 .../sysml/runtime/transform/MVImputeAgent.java  | 191 ++++++++-----------
 .../sysml/runtime/transform/OmitAgent.java      |  71 ++-----
 .../sysml/runtime/transform/RecodeAgent.java    | 154 ++++++---------
 .../apache/sysml/runtime/transform/TfUtils.java | 114 ++++++-----
 .../runtime/transform/TransformationAgent.java  |  87 ---------
 .../sysml/runtime/transform/decode/Decoder.java |   3 +-
 .../transform/decode/DecoderComposite.java      |   3 +-
 .../transform/decode/DecoderFactory.java        |  13 +-
 .../runtime/transform/decode/DecoderRecode.java |   3 +-
 .../sysml/runtime/transform/encode/Encoder.java | 104 ++++++++++
 .../functions/transform/ScalingTest.java        |   5 +-
 17 files changed, 501 insertions(+), 659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java 
b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index abb22ed..00477cf 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -63,8 +63,6 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.TfUtils;
-import org.apache.sysml.runtime.transform.TransformationAgent;
-import org.apache.sysml.runtime.transform.TransformationAgent.TX_METHOD;
 import org.apache.sysml.runtime.transform.decode.DecoderRecode;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.MapReduceTool;
@@ -462,14 +460,14 @@ public class Connection
                        
                        //parse json transform specification
                        JSONObject jSpec = new JSONObject(spec);
-                       if ( jSpec.containsKey(TX_METHOD.RECODE.toString()))  {
+                       if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE))  {
                                JSONArray attrs = null; //TODO simplify once 
json spec consolidated
-                               if( jSpec.get(TX_METHOD.RECODE.toString()) 
instanceof JSONObject ) {
-                                       JSONObject obj = (JSONObject) 
jSpec.get(TX_METHOD.RECODE.toString());
-                                       attrs = (JSONArray) 
obj.get(TransformationAgent.JSON_ATTRS);
+                               if( jSpec.get(TfUtils.TXMETHOD_RECODE) 
instanceof JSONObject ) {
+                                       JSONObject obj = (JSONObject) 
jSpec.get(TfUtils.TXMETHOD_RECODE);
+                                       attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
                                }
                                else
-                                       attrs = 
(JSONArray)jSpec.get(TX_METHOD.RECODE.toString());                              
+                                       attrs = 
(JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE);                          
                                for(int j=0; j<attrs.length(); j++) 
                                        
specRecodeIDs.add(UtilFunctions.toInt(attrs.get(j)));
                        }       

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
index 9b4f70c..c1d99e6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
@@ -124,7 +124,6 @@ public class ApplyTfCSVSPARK {
                                        _tfmapper.processHeaderLine();
                                        
                                        if (_tfmapper.hasHeader() ) {
-                                               //outLines.add(dcdHeader); // 
if the header needs to be preserved in the output file
                                                continue; 
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index 8a7199e..4f24afd 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -41,68 +41,67 @@ import org.apache.wink.json4j.JSONObject;
 import scala.Tuple2;
 
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
+import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
-public class BinAgent extends TransformationAgent {
-       
+public class BinAgent extends Encoder 
+{      
        private static final long serialVersionUID = 1917445005206076078L;
 
        public static final String MIN_PREFIX = "min";
        public static final String MAX_PREFIX = "max";
        public static final String NBINS_PREFIX = "nbins";
 
-       private int[] _binList = null;
-       //private byte[] _binMethodList = null; // Not used, since only 
equi-width is supported for now. 
        private int[] _numBins = null;
-
        private double[] _min=null, _max=null;  // min and max among 
non-missing values
-
        private double[] _binWidths = null;             // width of a bin for 
each attribute
        
-       BinAgent() { }
+       public BinAgent() {
+               super( null );
+       }
        
-       BinAgent(JSONObject parsedSpec) throws JSONException {
-               
-               if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
+       public BinAgent(JSONObject parsedSpec) 
+               throws JSONException 
+       {
+               super( null );          
+               if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
                        return;
                
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.BIN.toString());
-               
-               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-               //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
-               JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
+               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_BIN);             
+               JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
+               JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS);
                        
-               assert(attrs.size() == nbins.size());
-                       
-               _binList = new int[attrs.size()];
+               initColList(attrs);
                _numBins = new int[attrs.size()];
-               for(int i=0; i < _binList.length; i++) {
-                       _binList[i] = UtilFunctions.toInt(attrs.get(i));
+               for(int i=0; i < _numBins.length; i++)
                        _numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
-               }
                
                // initialize internal transformation metadata
-               _min = new double[_binList.length];
+               _min = new double[_colList.length];
                Arrays.fill(_min, Double.MAX_VALUE);
-               _max = new double[_binList.length];
+               _max = new double[_colList.length];
                Arrays.fill(_max, -Double.MAX_VALUE);
                
-               _binWidths = new double[_binList.length];
+               _binWidths = new double[_colList.length];
        }
+
+       public int[] getNumBins() { return _numBins; }
+       public double[] getMin()  { return _min; }
+       public double[] getBinWidths() { return _binWidths; }
        
        public void prepare(String[] words, TfUtils agents) {
-               if ( _binList == null )
+               if ( !isApplicable() )
                        return;
                
-               for(int i=0; i <_binList.length; i++) {
-                       int colID = _binList[i];
+               for(int i=0; i <_colList.length; i++) {
+                       int colID = _colList[i];
                        
                        String w = null;
                        double d = 0;
                                
                        // equi-width
                        w = UtilFunctions.unquote(words[colID-1].trim());
-                       if(!agents.isNA(w)) {
+                       if(!TfUtils.isNA(agents.getNAStrings(),w)) {
                                d = UtilFunctions.parseToDouble(w);
                                if(d < _min[i])
                                        _min[i] = d;
@@ -136,12 +135,12 @@ public class BinAgent extends TransformationAgent {
         */
        @Override
        public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               if ( _binList == null )
+               if( !isApplicable() )
                        return;
                
                try { 
-                       for(int i=0; i < _binList.length; i++) {
-                               int colID = _binList[i];
+                       for(int i=0; i < _colList.length; i++) {
+                               int colID = _colList[i];
                                IntWritable iw = new IntWritable(-colID);
                                
                                out.collect(iw,  prepMinOutput(i));
@@ -154,12 +153,12 @@ public class BinAgent extends TransformationAgent {
        }
        
        public ArrayList<Tuple2<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
-               if ( _binList == null )
+               if ( !isApplicable() )
                        return list;
                
                try { 
-                       for(int i=0; i < _binList.length; i++) {
-                               int colID = _binList[i];
+                       for(int i=0; i < _colList.length; i++) {
+                               int colID = _colList[i];
                                Integer iw = -colID;
                                
                                list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMinOutput(i)) );
@@ -174,7 +173,7 @@ public class BinAgent extends TransformationAgent {
 
        private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
        {
-               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
BIN_FILE_SUFFIX);
+               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.BIN_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + 
max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n");
                br.close();
@@ -225,15 +224,15 @@ public class BinAgent extends TransformationAgent {
        
        
        public void outputTransformationMetadata(String outputDir, FileSystem 
fs, TfUtils agents) throws IOException {
-               if(_binList == null)
+               if( !isApplicable() )
                        return;
                
                MVImputeAgent mvagent = agents.getMVImputeAgent();
-               for(int i=0; i < _binList.length; i++) {
-                       int colID = _binList[i];
+               for(int i=0; i < _colList.length; i++) {
+                       int colID = _colList[i];
                        
                        // If the column is imputed with a constant, then 
adjust min and max based the value of the constant.
-                       if ( mvagent.isImputed(colID) != -1 && 
mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
+                       if ( mvagent.isApplicable(colID) != -1 && 
mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
                        {
                                double cst = UtilFunctions.parseToDouble( 
mvagent.getReplacement(colID) );
                                if ( cst < _min[i])
@@ -249,11 +248,6 @@ public class BinAgent extends TransformationAgent {
        
        // 
------------------------------------------------------------------------------------------------
 
-       public int[] getBinList() { return _binList; }
-       public int[] getNumBins() { return _numBins; }
-       public double[] getMin()  { return _min; }
-       public double[] getBinWidths() { return _binWidths; }
-       
        /**
         * Method to load transform metadata for all attributes
         * 
@@ -262,14 +256,14 @@ public class BinAgent extends TransformationAgent {
         */
        @Override
        public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if ( _binList == null )
+               if( !isApplicable() )
                        return;
                
                if(fs.isDirectory(txMtdDir)) {
-                       for(int i=0; i<_binList.length;i++) {
-                               int colID = _binList[i];
+                       for(int i=0; i<_colList.length;i++) {
+                               int colID = _colList[i];
                                
-                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + BIN_FILE_SUFFIX);
+                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + TfUtils.BIN_FILE_SUFFIX);
                                TfUtils.checkValidInputFile(fs, path, true); 
                                        
                                BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
@@ -300,56 +294,27 @@ public class BinAgent extends TransformationAgent {
         * @return
         */
        @Override
-       public String[] apply(String[] words, TfUtils agents) {
-               if ( _binList == null )
+       public String[] apply(String[] words) {
+               if( !isApplicable() )
                        return words;
        
-               for(int i=0; i < _binList.length; i++) {
-                       int colID = _binList[i];
-                       
+               for(int i=0; i < _colList.length; i++) {
+                       int colID = _colList[i];
                        try {
-                       double val = 
UtilFunctions.parseToDouble(words[colID-1]);
-                       int binid = 1;
-                       double tmp = _min[i] + _binWidths[i];
-                       while(val > tmp && binid < _numBins[i]) {
-                               tmp += _binWidths[i];
-                               binid++;
-                       }
-                       words[colID-1] = Integer.toString(binid);
-                       } catch(NumberFormatException e)
-                       {
+                               double val = 
UtilFunctions.parseToDouble(words[colID-1]);
+                               int binid = 1;
+                               double tmp = _min[i] + _binWidths[i];
+                               while(val > tmp && binid < _numBins[i]) {
+                                       tmp += _binWidths[i];
+                                       binid++;
+                               }
+                               words[colID-1] = Integer.toString(binid);
+                       } 
+                       catch(NumberFormatException e) {
                                throw new RuntimeException("Encountered \"" + 
words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric 
value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an 
appropriate imputation method.");
                        }
                }
                
                return words;
        }
-       
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isBinned(int colID)
-       {
-               if(_binList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_binList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-
-
-       @Override
-       public void print() {
-               System.out.print("Binning List (Equi-width): \n    ");
-               for(int i : _binList) {
-                       System.out.print(i + " ");
-               }
-               System.out.print("\n    ");
-               for(int b : _numBins) {
-                       System.out.print(b + " ");
-               }
-               System.out.println();
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java 
b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
index 16f6228..a88923f 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
@@ -78,7 +78,6 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.transform.TransformationAgent.TX_METHOD;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.utils.JSONHelper;
@@ -237,8 +236,8 @@ public class DataTransform
                
                // 
--------------------------------------------------------------------------
                // Omit
-               if( inputSpec.containsKey(TX_METHOD.OMIT.toString()) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TX_METHOD.OMIT.toString());
+               if( inputSpec.containsKey(TfUtils.TXMETHOD_OMIT) ) {
+                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_OMIT);
                        omitList = new int[arrtmp.size()];
                        for(int i=0; i<arrtmp.size(); i++) {
                                if(byPositions)
@@ -254,8 +253,8 @@ public class DataTransform
                        omitList = null;
                // 
--------------------------------------------------------------------------
                // Missing value imputation
-               if( inputSpec.containsKey(TX_METHOD.IMPUTE.toString()) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TX_METHOD.IMPUTE.toString());
+               if( inputSpec.containsKey(TfUtils.TXMETHOD_IMPUTE) ) {
+                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_IMPUTE);
                        
                        mvList = new int[arrtmp.size()];
                        mvMethods = new byte[arrtmp.size()];
@@ -306,8 +305,8 @@ public class DataTransform
                        mvList = null;
                // 
--------------------------------------------------------------------------
                // Recoding
-               if( inputSpec.containsKey(TX_METHOD.RECODE.toString()) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TX_METHOD.RECODE.toString());
+               if( inputSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) {
+                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_RECODE);
                        rcdList = new int[arrtmp.size()];
                        for(int i=0; i<arrtmp.size(); i++) {
                                if (byPositions)
@@ -323,8 +322,8 @@ public class DataTransform
                        rcdList = null;
                // 
--------------------------------------------------------------------------
                // Binning
-               if( inputSpec.containsKey(TX_METHOD.BIN.toString()) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TX_METHOD.BIN.toString());
+               if( inputSpec.containsKey(TfUtils.TXMETHOD_BIN) ) {
+                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_BIN);
                        
                        binList = new int[arrtmp.size()];
                        binMethods = new byte[arrtmp.size()];
@@ -349,7 +348,7 @@ public class DataTransform
                                        throw new IOException("Unknown missing 
value imputation method (" + stmp + ") in transformation specification: " + 
specWithNames);
                                binMethods[i] = btmp;
                                
-                               numBins[i] = 
entry.get(TransformationAgent.JSON_NBINS);
+                               numBins[i] = entry.get(TfUtils.JSON_NBINS);
                                if ( ((Integer) numBins[i]).intValue() <= 1 ) 
                                        throw new 
IllegalArgumentException("Invalid transformation on column \"" + (String) 
entry.get(NAME) + "\". Number of bins must be greater than 1.");
                        }
@@ -371,8 +370,8 @@ public class DataTransform
                        binList = null;
                // 
--------------------------------------------------------------------------
                // Dummycoding
-               if( inputSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TX_METHOD.DUMMYCODE.toString());
+               if( inputSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
+                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_DUMMYCODE);
                        dcdList = new int[arrtmp.size()];
                        for(int i=0; i<arrtmp.size(); i++) {
                                if (byPositions)
@@ -388,8 +387,8 @@ public class DataTransform
                        dcdList = null;
                // 
--------------------------------------------------------------------------
                // Scaling
-               if(inputSpec.containsKey(TX_METHOD.SCALE.toString()) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TX_METHOD.SCALE.toString());
+               if(inputSpec.containsKey(TfUtils.TXMETHOD_SCALE) ) {
+                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_SCALE);
                        
                        scaleList = new int[arrtmp.size()];
                        scaleMethods = new byte[arrtmp.size()];
@@ -535,55 +534,55 @@ public class DataTransform
                if (omitList != null)
                {
                        JSONObject rcdSpec = new JSONObject();
-                       rcdSpec.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(omitList));
-                       outputSpec.put(TX_METHOD.OMIT.toString(), rcdSpec);
+                       rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(omitList));
+                       outputSpec.put(TfUtils.TXMETHOD_OMIT, rcdSpec);
                }
                
                if (mvList != null)
                {
                        JSONObject mvSpec = new JSONObject();
-                       mvSpec.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(mvList));
-                       mvSpec.put(TransformationAgent.JSON_MTHD, 
toJSONArray(mvMethods));
-                       mvSpec.put(TransformationAgent.JSON_CONSTS, 
toJSONArray(mvConstants));
-                       outputSpec.put(TX_METHOD.IMPUTE.toString(), mvSpec);
+                       mvSpec.put(TfUtils.JSON_ATTRS, toJSONArray(mvList));
+                       mvSpec.put(TfUtils.JSON_MTHD, toJSONArray(mvMethods));
+                       mvSpec.put(TfUtils.JSON_CONSTS, 
toJSONArray(mvConstants));
+                       outputSpec.put(TfUtils.TXMETHOD_IMPUTE, mvSpec);
                }
                
                if (rcdList != null)
                {
                        JSONObject rcdSpec = new JSONObject();
-                       rcdSpec.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(rcdList));
-                       outputSpec.put(TX_METHOD.RECODE.toString(), rcdSpec);
+                       rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(rcdList));
+                       outputSpec.put(TfUtils.TXMETHOD_RECODE, rcdSpec);
                }
                
                if (binList != null)
                {
                        JSONObject binSpec = new JSONObject();
-                       binSpec.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(binList));
-                       binSpec.put(TransformationAgent.JSON_MTHD, 
toJSONArray(binMethods));
-                       binSpec.put(TransformationAgent.JSON_NBINS, 
toJSONArray(numBins));
-                       outputSpec.put(TX_METHOD.BIN.toString(), binSpec);
+                       binSpec.put(TfUtils.JSON_ATTRS, toJSONArray(binList));
+                       binSpec.put(TfUtils.JSON_MTHD, toJSONArray(binMethods));
+                       binSpec.put(TfUtils.JSON_NBINS, toJSONArray(numBins));
+                       outputSpec.put(TfUtils.TXMETHOD_BIN, binSpec);
                }
                
                if (dcdList != null)
                {
                        JSONObject dcdSpec = new JSONObject();
-                       dcdSpec.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(dcdList));
-                       outputSpec.put(TX_METHOD.DUMMYCODE.toString(), dcdSpec);
+                       dcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(dcdList));
+                       outputSpec.put(TfUtils.TXMETHOD_DUMMYCODE, dcdSpec);
                }
                
                if (scaleList != null)
                {
                        JSONObject scaleSpec = new JSONObject();
-                       scaleSpec.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(scaleList));
-                       scaleSpec.put(TransformationAgent.JSON_MTHD, 
toJSONArray(scaleMethods));
-                       outputSpec.put(TX_METHOD.SCALE.toString(), scaleSpec);
+                       scaleSpec.put(TfUtils.JSON_ATTRS, 
toJSONArray(scaleList));
+                       scaleSpec.put(TfUtils.JSON_MTHD, 
toJSONArray(scaleMethods));
+                       outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec);
                }
                
                if (mvrcdList != null)
                {
                        JSONObject mvrcd = new JSONObject();
-                       mvrcd.put(TransformationAgent.JSON_ATTRS, 
toJSONArray(mvrcdList));
-                       outputSpec.put(TX_METHOD.MVRCD.toString(), mvrcd);
+                       mvrcd.put(TfUtils.JSON_ATTRS, toJSONArray(mvrcdList));
+                       outputSpec.put(TfUtils.TXMETHOD_MVRCD, mvrcd);
                }
                
                // return output spec with IDs
@@ -635,11 +634,11 @@ public class DataTransform
                MapReduceTool.renameFileOnHDFS(tmpPath + "/" + 
TfUtils.TXMTD_DC_COLNAMES, txMtdPath + "/" + TfUtils.TXMTD_DC_COLNAMES);
                MapReduceTool.renameFileOnHDFS(tmpPath + "/" + 
TfUtils.TXMTD_COLTYPES, txMtdPath + "/" + TfUtils.TXMTD_COLTYPES);
                
-               if ( fs.exists(new Path(tmpPath +"/Dummycode/" + 
TransformationAgent.DCD_FILE_NAME)) ) 
+               if ( fs.exists(new Path(tmpPath +"/Dummycode/" + 
TfUtils.DCD_FILE_NAME)) ) 
                {
                        if ( !fs.exists( new Path(txMtdPath + "/Dummycode/") )) 
                                fs.mkdirs(new Path(txMtdPath + "/Dummycode/"));
-                       MapReduceTool.renameFileOnHDFS( tmpPath + "/Dummycode/" 
+ TransformationAgent.DCD_FILE_NAME, txMtdPath + "/Dummycode/" + 
TransformationAgent.DCD_FILE_NAME);
+                       MapReduceTool.renameFileOnHDFS( tmpPath + "/Dummycode/" 
+ TfUtils.DCD_FILE_NAME, txMtdPath + "/Dummycode/" + TfUtils.DCD_FILE_NAME);
                }
        }
        
@@ -666,17 +665,17 @@ public class DataTransform
                br.close();
                
                // fetch relevant attribute lists
-               if ( !spec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
+               if ( !spec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) )
                        return ret;
                
-               JSONArray dcdList = (JSONArray) 
((JSONObject)spec.get(TX_METHOD.DUMMYCODE.toString())).get(TransformationAgent.JSON_ATTRS);
+               JSONArray dcdList = (JSONArray) 
((JSONObject)spec.get(TfUtils.TXMETHOD_DUMMYCODE)).get(TfUtils.JSON_ATTRS);
 
                // look for numBins among binned columns
                for(Object o : dcdList) 
                {
                        int id = UtilFunctions.toInt(o);
                        
-                       Path binpath = new Path( tfMtdPath + "/Bin/" + 
UtilFunctions.unquote(columnNames[id-1]) + TransformationAgent.BIN_FILE_SUFFIX);
+                       Path binpath = new Path( tfMtdPath + "/Bin/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.BIN_FILE_SUFFIX);
                        Path rcdpath = new Path( tfMtdPath + "/Recode/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX);
                        
                        if ( TfUtils.checkValidInputFile(fs, binpath, false ) )
@@ -1063,7 +1062,7 @@ public class DataTransform
                        ret = new MatrixBlock(input.getNumRows(), 
input.getNumColumns(), false);
                        Iterator<String[]> iter = input.getStringRowIterator();
                        for( int i=0; iter.hasNext(); i++ ) {
-                               String[] tmp = agents.apply(iter.next(), true);
+                               String[] tmp = agents.apply(iter.next());
                                for( int j=0; j<tmp.length; j++ )
                                        ret.appendValue(i, j, 
UtilFunctions.parseToDouble(tmp[j]));
                        }
@@ -1227,7 +1226,7 @@ public class DataTransform
                        _ba.loadTxMtd(job, fs, tmp, agents);
                        
                        _da.setRecodeMapsCP( _ra.getCPRecodeMaps() );
-                       _da.setNumBins(_ba.getBinList(), _ba.getNumBins());
+                       _da.setNumBins(_ba.getColList(), _ba.getNumBins());
                        _da.loadTxMtd(job, fs, tmp, agents);
                }
                else {
@@ -1247,7 +1246,7 @@ public class DataTransform
                        _ba.loadTxMtd(job, fs, tmp, agents);
                        
                        _da.setRecodeMaps( _ra.getRecodeMaps() );
-                       _da.setNumBins(_ba.getBinList(), _ba.getNumBins());
+                       _da.setNumBins(_ba.getColList(), _ba.getNumBins());
                        _da.loadTxMtd(job, fs, tmp, agents);
                }
                
@@ -1296,7 +1295,7 @@ public class DataTransform
 
                                if(!agents.omit(words))
                                {
-                                       words = agents.apply(words, !isApply);
+                                       words = agents.apply(words);
        
                                        if (isCSV)
                                        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java 
b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
index 2e52657..c35e160 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
@@ -59,8 +59,7 @@ public class DistinctValue implements Writable, Serializable {
                _count = count;
        }
        
-       public DistinctValue(OffsetCount oc) throws CharacterCodingException 
-       {
+       public DistinctValue(OffsetCount oc) throws CharacterCodingException {
                this(oc.filename + "," + oc.fileOffset, oc.count);
        }
        
@@ -70,8 +69,13 @@ public class DistinctValue implements Writable, Serializable 
{
                _count = -1;
        }
        
-       public String getWord() {  return new String( _bytes, 0, _length, 
Charset.forName("UTF-8") ); }
-       public long getCount() { return _count; }
+       public String getWord() {  
+               return new String( _bytes, 0, _length, Charset.forName("UTF-8") 
); 
+       }
+       
+       public long getCount() { 
+               return _count; 
+       }
        
        @Override
        public void write(DataOutput out) throws IOException {
@@ -85,24 +89,17 @@ public class DistinctValue implements Writable, 
Serializable {
        @Override
        public void readFields(DataInput in) throws IOException {
            // read word 
-               int newLength = WritableUtils.readVInt(in);
-           _bytes = new byte[newLength];
-           in.readFully(_bytes, 0, newLength);
-           _length = newLength;
-           if (_length != _bytes.length)
-               System.out.println("ERROR in DistinctValue.readFields()");
+               _length = WritableUtils.readVInt(in);
+           _bytes = new byte[_length];
+           in.readFully(_bytes, 0, _length);
            // read count
            _count = in.readLong();
        }
        
        public OffsetCount getOffsetCount() {
-               OffsetCount oc = new OffsetCount();
                String[] parts = getWord().split(",");
-               oc.filename = parts[0];
-               oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
-               oc.count = getCount();
-               
-               return oc;
+               return new OffsetCount( parts[0],
+                               UtilFunctions.parseToLong(parts[1]),
+                               getCount() );
        }
-       
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index e623ab5..9ceb368 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -41,13 +41,13 @@ import org.apache.wink.json4j.JSONObject;
 import com.google.common.base.Functions;
 import com.google.common.collect.Ordering;
 
+import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
-public class DummycodeAgent extends TransformationAgent {      
-       
+public class DummycodeAgent extends Encoder 
+{              
        private static final long serialVersionUID = 5832130477659116489L;
 
-       private int[] _dcdList = null;
        private long numCols = 0;
        
        private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
@@ -59,26 +59,18 @@ public class DummycodeAgent extends TransformationAgent {
        private int[] _dcdColumnMap = null;                     // to help in 
translating between original and dummycoded column IDs
        private long _dummycodedLength = 0;                     // #of columns 
after dummycoded
        
-       DummycodeAgent(int[] list) {
-               _dcdList = list;
+       public DummycodeAgent(int[] list) {
+               super(list);
        }
        
-       DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
+       public DummycodeAgent(JSONObject parsedSpec, long ncol) throws 
JSONException {
+               super(null);
                numCols = ncol;
                
-               if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
-                       return;
-               
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
-               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-                       
-               _dcdList = new int[attrs.size()];
-               for(int i=0; i < _dcdList.length; i++) 
-                       _dcdList[i] = UtilFunctions.toInt(attrs.get(i));
-       }
-       
-       public int[] dcdList() {
-               return _dcdList;
+               if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) )
+                       return;         
+               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_DUMMYCODE);
+               initColList( (JSONArray)obj.get(TfUtils.JSON_ATTRS) );  
        }
        
        /**
@@ -137,26 +129,26 @@ public class DummycodeAgent extends TransformationAgent {
        public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int 
numCols, TfUtils agents) throws IOException {
                
                // initialize all column types in the transformed data to SCALE
-               ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
+               TfUtils.ColumnTypes[] ctypes = new TfUtils.ColumnTypes[(int) 
_dummycodedLength];
                for(int i=0; i < _dummycodedLength; i++)
-                       ctypes[i] = ColumnTypes.SCALE;
+                       ctypes[i] = TfUtils.ColumnTypes.SCALE;
                
                _dcdColumnMap = new int[numCols];
 
-               Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
+               Path pt=new Path(txMtdDir+"/Dummycode/" + 
TfUtils.DCD_FILE_NAME);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                
                int sum=1;
                int idx = 0;
                for(int colID=1; colID <= numCols; colID++) 
                {
-                       if ( _dcdList != null && idx < _dcdList.length && 
_dcdList[idx] == colID )
+                       if ( _colList != null && idx < _colList.length && 
_colList[idx] == colID )
                        {
                                br.write(colID + TfUtils.TXMTD_SEP + "1" + 
TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + (sum+_domainSizes[idx]-1) + "\n");
                                _dcdColumnMap[colID-1] = 
(sum+_domainSizes[idx]-1)-1;
 
                                for(int i=sum; i <=(sum+_domainSizes[idx]-1); 
i++)
-                                       ctypes[i-1] = ColumnTypes.DUMMYCODED;
+                                       ctypes[i-1] = 
TfUtils.ColumnTypes.DUMMYCODED;
                                
                                sum += _domainSizes[idx];
                                idx++;
@@ -166,11 +158,11 @@ public class DummycodeAgent extends TransformationAgent {
                                br.write(colID + TfUtils.TXMTD_SEP + "0" + 
TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + sum + "\n");
                                _dcdColumnMap[colID-1] = sum-1;
                                
-                               if ( agents.getBinAgent().isBinned(colID) != -1 
)
-                                       ctypes[sum-1] = ColumnTypes.ORDINAL;    
// binned variable results in an ordinal column
+                               if ( agents.getBinAgent().isApplicable(colID) 
!= -1 )
+                                       ctypes[sum-1] = 
TfUtils.ColumnTypes.ORDINAL;    // binned variable results in an ordinal column
                                
-                               if ( agents.getRecodeAgent().isRecoded(colID) 
!= -1 )
-                                       ctypes[sum-1] = ColumnTypes.NOMINAL;
+                               if ( 
agents.getRecodeAgent().isApplicable(colID) != -1 )
+                                       ctypes[sum-1] = 
TfUtils.ColumnTypes.NOMINAL;
                                
                                sum += 1;
                        }
@@ -181,9 +173,9 @@ public class DummycodeAgent extends TransformationAgent {
                pt=new Path(txMtdDir + File.separator + TfUtils.TXMTD_COLTYPES);
                br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                
-               br.write(columnTypeToID(ctypes[0]) + "");
+               br.write(ctypes[0].toID() + "");
                for(int i = 1; i < _dummycodedLength; i++) 
-                       br.write( TfUtils.TXMTD_SEP + 
columnTypeToID(ctypes[i]));
+                       br.write( TfUtils.TXMTD_SEP + ctypes[i].toID() );
                br.close();
                
                return sum-1;
@@ -211,7 +203,7 @@ public class DummycodeAgent extends TransformationAgent {
        
        public String constructDummycodedHeader(String header, Pattern delim) {
                
-               if(_dcdList == null && _binList == null )
+               if(_colList == null && _binList == null )
                        // none of the columns are dummycoded, simply return 
the given header
                        return header;
                
@@ -223,11 +215,11 @@ public class DummycodeAgent extends TransformationAgent {
                // Dummycoding can be performed on either on a recoded column 
or on a binned column
                
                // process recoded columns
-               if(_finalMapsCP != null && _dcdList != null) 
+               if(_finalMapsCP != null && _colList != null) 
                {
-                       for(int i=0; i <_dcdList.length; i++) 
+                       for(int i=0; i <_colList.length; i++) 
                        {
-                               int colID = _dcdList[i];
+                               int colID = _colList[i];
                                HashMap<String,Long> map = 
_finalMapsCP.get(colID);
                                String colName = 
UtilFunctions.unquote(names[colID-1]);
                                
@@ -242,17 +234,17 @@ public class DummycodeAgent extends TransformationAgent {
                                        for(int idx=0; idx < newNames.size(); 
idx++) 
                                        {
                                                if(idx==0) 
-                                                       sb.append( colName + 
DCD_NAME_SEP + newNames.get(idx));
+                                                       sb.append( colName + 
TfUtils.DCD_NAME_SEP + newNames.get(idx));
                                                else
-                                                       sb.append( delim + 
colName + DCD_NAME_SEP + newNames.get(idx));
+                                                       sb.append( delim + 
colName + TfUtils.DCD_NAME_SEP + newNames.get(idx));
                                        }
                                        names[colID-1] = sb.toString();         
        // replace original column name with dcd name
                                }
                        }
                }
-               else if(_finalMaps != null && _dcdList != null) {
-                       for(int i=0; i <_dcdList.length; i++) {
-                               int colID = _dcdList[i];
+               else if(_finalMaps != null && _colList != null) {
+                       for(int i=0; i <_colList.length; i++) {
+                               int colID = _colList[i];
                                HashMap<String,String> map = 
_finalMaps.get(colID);
                                String colName = 
UtilFunctions.unquote(names[colID-1]);
                                
@@ -272,9 +264,9 @@ public class DummycodeAgent extends TransformationAgent {
                                        for(int idx=0; idx < newNames.size(); 
idx++) 
                                        {
                                                if(idx==0) 
-                                                       sb.append( colName + 
DCD_NAME_SEP + newNames.get(idx));
+                                                       sb.append( colName + 
TfUtils.DCD_NAME_SEP + newNames.get(idx));
                                                else
-                                                       sb.append( delim + 
colName + DCD_NAME_SEP + newNames.get(idx));
+                                                       sb.append( delim + 
colName + TfUtils.DCD_NAME_SEP + newNames.get(idx));
                                        }
                                        names[colID-1] = sb.toString();         
        // replace original column name with dcd name
                                }
@@ -288,7 +280,7 @@ public class DummycodeAgent extends TransformationAgent {
                                int colID = _binList[i];
                                
                                // need to consider only binned and dummycoded 
columns
-                               if(isDummyCoded(colID) == -1)
+                               if(isApplicable(colID) == -1)
                                        continue;
                                
                                int numBins = _numBins[i];
@@ -297,9 +289,9 @@ public class DummycodeAgent extends TransformationAgent {
                                sb.setLength(0);
                                for(int idx=0; idx < numBins; idx++) 
                                        if(idx==0) 
-                                               sb.append( colName + 
DCD_NAME_SEP + "Bin" + (idx+1) );
+                                               sb.append( colName + 
TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) );
                                        else
-                                               sb.append( delim + colName + 
DCD_NAME_SEP + "Bin" + (idx+1) );
+                                               sb.append( delim + colName + 
TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) );
                                names[colID-1] = sb.toString();                 
// replace original column name with dcd name
                        }
                
@@ -319,21 +311,20 @@ public class DummycodeAgent extends TransformationAgent {
        
        @Override
        public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if ( _dcdList == null )
-               {
+               if ( !isApplicable() ) {
                        _dummycodedLength = numCols;
                        return;
                }
                
                // sort to-be dummycoded column IDs in ascending order. This is 
the order in which the new dummycoded record is constructed in apply() function.
-               Arrays.sort(_dcdList);  
-               _domainSizes = new int[_dcdList.length];
+               Arrays.sort(_colList);  
+               _domainSizes = new int[_colList.length];
 
                _dummycodedLength = numCols;
                
                //HashMap<String, String> map = null;
-               for(int i=0; i<_dcdList.length; i++) {
-                       int colID = _dcdList[i];
+               for(int i=0; i<_colList.length; i++) {
+                       int colID = _colList[i];
                        
                        // Find the domain size for colID using _finalMaps or 
_finalMapsCP
                        int domainSize = 0;
@@ -372,26 +363,26 @@ public class DummycodeAgent extends TransformationAgent {
         * @return
         */
        @Override
-       public String[] apply(String[] words, TfUtils agents) {
-               
-               if ( _dcdList == null )
+       public String[] apply(String[] words) 
+       {
+               if( !isApplicable() )
                        return words;
                
                String[] nwords = new String[(int)_dummycodedLength];
-               
                int rcdVal = 0;
                
                for(int colID=1, idx=0, ncolID=1; colID <= words.length; 
colID++) {
-                       if(idx < _dcdList.length && colID==_dcdList[idx]) {
+                       if(idx < _colList.length && colID==_colList[idx]) {
                                // dummycoded columns
                                try {
-                               rcdVal = 
UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
-                               nwords[ ncolID-1+rcdVal-1 ] = "1";
-                               ncolID += _domainSizes[idx];
-                               idx++;
-                               } catch (Exception e) {
-                                       System.out.println("Error in 
dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", 
domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
-                                       throw new RuntimeException(e);
+                                       rcdVal = 
UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+                                       nwords[ ncolID-1+rcdVal-1 ] = "1";
+                                       ncolID += _domainSizes[idx];
+                                       idx++;
+                               } 
+                               catch (Exception e) {
+                                       throw new RuntimeException("Error in 
dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] 
+                                                       + ", domainSize=" + 
_domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
                                }
                        }
                        else {
@@ -402,27 +393,4 @@ public class DummycodeAgent extends TransformationAgent {
                
                return nwords;
        }
-       
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isDummyCoded(int colID)
-       {
-               if(_dcdList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_dcdList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-       
-       @Override
-       public void print() {
-               System.out.print("Dummycoding List: \n    ");
-               for(int i : _dcdList) {
-                       System.out.print(i + " ");
-               }
-               System.out.println();
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index 9763403..5cadef6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -26,7 +26,6 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -50,10 +49,11 @@ import 
org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
-public class MVImputeAgent extends TransformationAgent {
-       
+public class MVImputeAgent extends Encoder 
+{      
        private static final long serialVersionUID = 9057868620144662194L;
 
        public static final String MEAN_PREFIX = "mean";
@@ -65,7 +65,6 @@ public class MVImputeAgent extends TransformationAgent {
        
        public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
        
-       private int[] _mvList = null;
        /* 
         * Imputation Methods:
         * 1 - global_mean
@@ -94,6 +93,8 @@ public class MVImputeAgent extends TransformationAgent {
        private CM_COV_Object[] _scnomvVarList = null;          // column-level 
variances, computed so far
        
        private String[] _replacementList = null;               // 
replacements: for global_mean, mean; and for global_mode, recode id of mode 
category
+       private String[] _NAstrings = null;
+       
        
        public String[] getReplacements() { return _replacementList; }
        public KahanObject[] getMeans()   { return _meanList; }
@@ -101,46 +102,47 @@ public class MVImputeAgent extends TransformationAgent {
        public KahanObject[] getMeans_scnomv()   { return _scnomvMeanList; }
        public CM_COV_Object[] getVars_scnomv()  { return _scnomvVarList; }
        
-       MVImputeAgent(JSONObject parsedSpec) throws JSONException {
-       
-               boolean isMV = 
parsedSpec.containsKey(TX_METHOD.IMPUTE.toString());
-               boolean isSC = 
parsedSpec.containsKey(TX_METHOD.SCALE.toString());
+       public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings)
+               throws JSONException 
+       {
+               super(null);    
+               boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE);
+               boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE);  
        
+               _NAstrings = NAstrings;
                
                if(!isMV) {
                        // MV Impute is not applicable
-                       _mvList = null;
+                       _colList = null;
                        _mvMethodList = null;
                        _meanList = null;
                        _countList = null;
                        _replacementList = null;
                }
                else {
-                       JSONObject mvobj = (JSONObject) 
parsedSpec.get(TX_METHOD.IMPUTE.toString());
-                       JSONArray mvattrs = (JSONArray) mvobj.get(JSON_ATTRS);
-                       JSONArray mvmthds = (JSONArray) mvobj.get(JSON_MTHD);
+                       JSONObject mvobj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+                       JSONArray mvattrs = (JSONArray) 
mvobj.get(TfUtils.JSON_ATTRS);
+                       JSONArray mvmthds = (JSONArray) 
mvobj.get(TfUtils.JSON_MTHD);
                        int mvLength = mvattrs.size();
                        
-                       assert(mvLength == mvmthds.size());
-                       
-                       _mvList = new int[mvLength];
+                       _colList = new int[mvLength];
                        _mvMethodList = new byte[mvLength];
                        
                        _meanList = new KahanObject[mvLength];
                        _countList = new long[mvLength];
                        _varList = new CM_COV_Object[mvLength];
                        
-                       _isMVScaled = new BitSet(_mvList.length);
+                       _isMVScaled = new BitSet(_colList.length);
                        _isMVScaled.clear();
                        
-                       for(int i=0; i < _mvList.length; i++) {
-                               _mvList[i] = 
UtilFunctions.toInt(mvattrs.get(i));
+                       for(int i=0; i < _colList.length; i++) {
+                               _colList[i] = 
UtilFunctions.toInt(mvattrs.get(i));
                                _mvMethodList[i] = (byte) 
UtilFunctions.toInt(mvmthds.get(i)); 
                                _meanList[i] = new KahanObject(0, 0);
                        }
                        
                        _replacementList = new String[mvLength];        // 
contains replacements for all columns (scale and categorical)
                        
-                       JSONArray constants = (JSONArray)mvobj.get(JSON_CONSTS);
+                       JSONArray constants = 
(JSONArray)mvobj.get(TfUtils.JSON_CONSTS);
                        for(int i=0; i < constants.size(); i++) {
                                if ( constants.get(i) == null )
                                        _replacementList[i] = "NaN";
@@ -159,12 +161,12 @@ public class MVImputeAgent extends TransformationAgent {
                }
                else
                {
-                       if ( _mvList != null ) 
-                               _mvscMethodList = new byte[_mvList.length];
+                       if ( _colList != null ) 
+                               _mvscMethodList = new byte[_colList.length];
                        
-                       JSONObject scobj = (JSONObject) 
parsedSpec.get(TX_METHOD.SCALE.toString());
-                       JSONArray scattrs = (JSONArray) scobj.get(JSON_ATTRS);
-                       JSONArray scmthds = (JSONArray) scobj.get(JSON_MTHD);
+                       JSONObject scobj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_SCALE);
+                       JSONArray scattrs = (JSONArray) 
scobj.get(TfUtils.JSON_ATTRS);
+                       JSONArray scmthds = (JSONArray) 
scobj.get(TfUtils.JSON_MTHD);
                        int scLength = scattrs.size();
                        
                        int[] _allscaled = new int[scLength];
@@ -178,7 +180,7 @@ public class MVImputeAgent extends TransformationAgent {
                                _allscaled[i] = colID;
                                
                                // check if the attribute is also MV imputed
-                               int mvidx = isImputed(colID);
+                               int mvidx = isApplicable(colID);
                                if(mvidx != -1)
                                {
                                        _isMVScaled.set(mvidx);
@@ -203,7 +205,7 @@ public class MVImputeAgent extends TransformationAgent {
                                        colID = 
UtilFunctions.toInt(scattrs.get(i));
                                        mthd = 
(byte)UtilFunctions.toInt(scmthds.get(i)); 
                                                        
-                                       if(isImputed(colID) == -1)
+                                       if(isApplicable(colID) == -1)
                                        {       // scaled but not imputed
                                                _scnomvList[idx] = colID;
                                                _scnomvMethodList[idx] = mthd;
@@ -216,17 +218,17 @@ public class MVImputeAgent extends TransformationAgent {
                }
        }
        
-       public void prepare(String[] words, TfUtils agents) throws IOException {
+       public void prepare(String[] words) throws IOException {
                
                try {
                        String w = null;
-                       if(_mvList != null)
-                       for(int i=0; i <_mvList.length; i++) {
-                               int colID = _mvList[i];
+                       if(_colList != null)
+                       for(int i=0; i <_colList.length; i++) {
+                               int colID = _colList[i];
                                w = 
UtilFunctions.unquote(words[colID-1].trim());
                                
                                try {
-                               if(!agents.isNA(w)) {
+                               if(!TfUtils.isNA(_NAstrings, w)) {
                                        _countList[i]++;
                                        
                                        boolean computeMean = (_mvMethodList[i] 
== 1 || _isMVScaled.get(i) );
@@ -421,9 +423,9 @@ public class MVImputeAgent extends TransformationAgent {
                        StringBuilder sb = new StringBuilder();
                        DistinctValue dv = null;
                        
-                       if(_mvList != null)
-                               for(int i=0; i < _mvList.length; i++) {
-                                       int colID = _mvList[i];
+                       if(_colList != null)
+                               for(int i=0; i < _colList.length; i++) {
+                                       int colID = _colList[i];
                                        IntWritable iw = new 
IntWritable(-colID);
                                        
                                        dv = prepMeanOutput(taskID, i, sb, 
false);                              outDV(iw, dv, out);
@@ -476,9 +478,9 @@ public class MVImputeAgent extends TransformationAgent {
                        StringBuilder sb = new StringBuilder();
                        DistinctValue dv = null;
                        
-                       if(_mvList != null)
-                               for(int i=0; i < _mvList.length; i++) {
-                                       int colID = _mvList[i];
+                       if(_colList != null)
+                               for(int i=0; i < _colList.length; i++) {
+                                       int colID = _colList[i];
                                        Integer iw = -colID;
                                        
                                        dv = prepMeanOutput(taskID, i, sb, 
false);                              addDV(iw, dv, list);
@@ -516,7 +518,7 @@ public class MVImputeAgent extends TransformationAgent {
        
        private void writeTfMtd(int colID, String mean, String tfMtdDir, 
FileSystem fs, TfUtils agents) throws IOException 
        {
-               Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + 
MV_FILE_SUFFIX);
+               Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + 
TfUtils.MV_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + mean + "\n");
                br.close();
@@ -524,7 +526,7 @@ public class MVImputeAgent extends TransformationAgent {
        
        private void writeTfMtd(int colID, String mean, String sdev, String 
tfMtdDir, FileSystem fs, TfUtils agents) throws IOException 
        {
-               Path pt=new Path(tfMtdDir+"/Scale/"+ agents.getName(colID) + 
SCALE_FILE_SUFFIX);
+               Path pt=new Path(tfMtdDir+"/Scale/"+ agents.getName(colID) + 
TfUtils.SCALE_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + mean + TfUtils.TXMTD_SEP + 
sdev + "\n");
                br.close();
@@ -532,7 +534,7 @@ public class MVImputeAgent extends TransformationAgent {
        
        private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
        {
-               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
BIN_FILE_SUFFIX);
+               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.BIN_FILE_SUFFIX);
                BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + 
max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n");
                br.close();
@@ -541,9 +543,9 @@ public class MVImputeAgent extends TransformationAgent {
        public void outputTransformationMetadata(String outputDir, FileSystem 
fs, TfUtils agents) throws IOException {
                
                try{
-                       if (_mvList != null)
-                               for(int i=0; i < _mvList.length; i++) {
-                                       int colID = _mvList[i];
+                       if (_colList != null)
+                               for(int i=0; i < _colList.length; i++) {
+                                       int colID = _colList[i];
                                        
                                        double imputedValue = Double.NaN;
                                        KahanObject gmean = null;
@@ -770,11 +772,11 @@ public class MVImputeAgent extends TransformationAgent {
                {
                        try {
                                if( totalValidCount != totalRecordCount) {
-                                       // In the presense of missing values, 
the variance needs to be adjusted.
+                                       // In the presence of missing values, 
the variance needs to be adjusted.
                                        // The mean does not need to be 
adjusted, when mv impute method is global_mean, 
                                        // since missing values themselves are 
replaced with gmean.
                                        long totalMissingCount = 
(totalRecordCount-totalValidCount);
-                                       int idx = isImputed(colID);
+                                       int idx = isApplicable(colID);
                                        if(idx != -1 && _mvMethodList[idx] == 
3) 
                                                _meanFn.execute(gmean, 
UtilFunctions.parseToDouble(_replacementList[idx]), totalRecordCount);
                                        _varFn.execute(gcm, gmean._sum, 
totalMissingCount);
@@ -797,7 +799,7 @@ public class MVImputeAgent extends TransformationAgent {
 
        private String readReplacement(int colID, FileSystem fs, Path  
txMtdDir, TfUtils agents) throws IOException
        {
-               Path path = new Path( txMtdDir + "/Impute/" + 
agents.getName(colID) + MV_FILE_SUFFIX);
+               Path path = new Path( txMtdDir + "/Impute/" + 
agents.getName(colID) + TfUtils.MV_FILE_SUFFIX);
                TfUtils.checkValidInputFile(fs, path, true); 
                
                BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
@@ -810,7 +812,7 @@ public class MVImputeAgent extends TransformationAgent {
        
        public String readScaleLine(int colID, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException
        {
-               Path path = new Path( txMtdDir + "/Scale/" + 
agents.getName(colID) + SCALE_FILE_SUFFIX);
+               Path path = new Path( txMtdDir + "/Scale/" + 
agents.getName(colID) + TfUtils.SCALE_FILE_SUFFIX);
                TfUtils.checkValidInputFile(fs, path, true); 
                BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
                String line = br.readLine();
@@ -846,9 +848,9 @@ public class MVImputeAgent extends TransformationAgent {
                if(fs.isDirectory(tfMtdDir)) {
                        
                        // Load information about missing value imputation
-                       if (_mvList != null)
-                               for(int i=0; i<_mvList.length;i++) {
-                                       int colID = _mvList[i];
+                       if (_colList != null)
+                               for(int i=0; i<_colList.length;i++) {
+                                       int colID = _colList[i];
                                        
                                        if ( _mvMethodList[i] == 1 || 
_mvMethodList[i] == 2 )
                                                // global_mean or global_mode
@@ -862,10 +864,10 @@ public class MVImputeAgent extends TransformationAgent {
                                }
                        
                        // Load scaling information
-                       if(_mvList != null)
-                               for(int i=0; i < _mvList.length; i++)
+                       if(_colList != null)
+                               for(int i=0; i < _colList.length; i++)
                                        if ( _isMVScaled.get(i) ) 
-                                               processScalingFile(i, _mvList, 
_meanList, _varList, fs, tfMtdDir, agents);
+                                               processScalingFile(i, _colList, 
_meanList, _varList, fs, tfMtdDir, agents);
                        
                        if(_scnomvList != null)
                                for(int i=0; i < _scnomvList.length; i++)
@@ -884,21 +886,21 @@ public class MVImputeAgent extends TransformationAgent {
         * @return
         */
        @Override
-       public String[] apply(String[] words, TfUtils agents) {
-               
-               if ( _mvList != null)
-               for(int i=0; i < _mvList.length; i++) {
-                       int colID = _mvList[i];
-                       String w = UtilFunctions.unquote(words[colID-1]);
-                       if(agents.isNA(w))
-                               w = words[colID-1] = _replacementList[i];
-                       
-                       if ( _isMVScaled.get(i) )
-                               if ( _mvscMethodList[i] == 1 )
-                                       words[colID-1] = Double.toString( 
UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
-                               else
-                                       words[colID-1] = Double.toString( 
(UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / _varList[i].mean._sum );
-               }
+       public String[] apply(String[] words) 
+       {       
+               if( isApplicable() )
+                       for(int i=0; i < _colList.length; i++) {
+                               int colID = _colList[i];
+                               String w = 
UtilFunctions.unquote(words[colID-1]);
+                               if(TfUtils.isNA(_NAstrings, w))
+                                       w = words[colID-1] = 
_replacementList[i];
+                               
+                               if ( _isMVScaled.get(i) )
+                                       if ( _mvscMethodList[i] == 1 )
+                                               words[colID-1] = 
Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
+                                       else
+                                               words[colID-1] = 
Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / 
_varList[i].mean._sum );
+                       }
                
                if(_scnomvList != null)
                for(int i=0; i < _scnomvList.length; i++)
@@ -913,23 +915,8 @@ public class MVImputeAgent extends TransformationAgent {
                return words;
        }
        
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isImputed(int colID)
-       {
-               if(_mvList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_mvList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-       
-       public MVMethod getMethod(int colID) 
-       {
-               int idx = isImputed(colID);
-               
+       public MVMethod getMethod(int colID) {
+               int idx = isApplicable(colID);          
                if(idx == -1)
                        return MVMethod.INVALID;
                
@@ -943,35 +930,13 @@ public class MVImputeAgent extends TransformationAgent {
                
        }
        
-       public long getNonMVCount(int colID) 
-       {
-               int idx = isImputed(colID);
-               if(idx == -1)
-                       return 0;
-               else
-                       return _countList[idx];
+       public long getNonMVCount(int colID) {
+               int idx = isApplicable(colID);
+               return (idx == -1) ? 0 : _countList[idx];
        }
        
-       public String getReplacement(int colID) 
-       {
-               int idx = isImputed(colID);
-               
-               if(idx == -1)
-                       return null;
-               else
-                       return _replacementList[idx];
-       }
-       
-       public void print() {
-               System.out.print("MV Imputation List: \n    ");
-               for(int i : _mvList) {
-                       System.out.print(i + " ");
-               }
-               System.out.print("\n    ");
-               for(byte b : _mvMethodList) {
-                       System.out.print(b + " ");
-               }
-               System.out.println();
+       public String getReplacement(int colID)  {
+               int idx = isApplicable(colID);          
+               return (idx == -1) ? null : _replacementList[idx];
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
index bd2feb3..c169129 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.transform;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -31,71 +30,43 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
-
+import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
-public class OmitAgent extends TransformationAgent {
-       
+public class OmitAgent extends Encoder 
+{      
        private static final long serialVersionUID = 1978852120416654195L;
 
-       private int[] _omitList = null;
-
-       OmitAgent() { }
+       public OmitAgent() { 
+               super(null);
+       }
        
-       OmitAgent(int[] list) {
-               _omitList = list;
+       public OmitAgent(int[] list) {
+               super(list);
        }
        
-       public OmitAgent(JSONObject parsedSpec) throws JSONException {
-               if (!parsedSpec.containsKey(TX_METHOD.OMIT.toString()))
+       public OmitAgent(JSONObject parsedSpec) 
+               throws JSONException 
+       {
+               super(null);
+               if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT))
                        return;
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.OMIT.toString());
-               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-               
-               _omitList = new int[attrs.size()];
-               for(int i=0; i < _omitList.length; i++) 
-                       _omitList[i] = UtilFunctions.toInt(attrs.get(i));
+               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_OMIT);
+               initColList((JSONArray) obj.get(TfUtils.JSON_ATTRS));
        }
        
        public boolean omit(String[] words, TfUtils agents) 
        {
-               if(_omitList == null)
+               if( !isApplicable() )
                        return false;
                
-               for(int i=0; i<_omitList.length; i++) 
-               {
-                       int colID = _omitList[i];
-                       
if(agents.isNA(UtilFunctions.unquote(words[colID-1].trim())))
+               for(int i=0; i<_colList.length; i++) {
+                       int colID = _colList[i];
+                       
if(TfUtils.isNA(agents.getNAStrings(),UtilFunctions.unquote(words[colID-1].trim())))
                                return true;
                }
                return false;
        }
-       
-       public boolean isApplicable() 
-       {
-               return (_omitList != null);
-       }
-       
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isOmitted(int colID)
-       {
-               if(_omitList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_omitList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-
-       @Override
-       public void print() {
-               System.out.print("Omit List: \n    ");
-               for(int i : _omitList) 
-                       System.out.print(i + " ");
-               System.out.println();
-       }
 
        @Override
        public void mapOutputTransformationMetadata(
@@ -115,10 +86,8 @@ public class OmitAgent extends TransformationAgent {
        }
 
        @Override
-       public String[] apply(String[] words, TfUtils agents) {
+       public String[] apply(String[] words) {
                return null;
        }
-
-
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index aa7c22e..1f9749d 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -48,44 +47,42 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.decode.DecoderRecode;
+import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
-public class RecodeAgent extends TransformationAgent {
-       
+public class RecodeAgent extends Encoder 
+{      
        private static final long serialVersionUID = 8213163881283341874L;
 
-       private int[] _rcdList = null;
        private int[] _mvrcdList = null;
        private int[] _fullrcdList = null;
 
        // HashMap< columnID, HashMap<distinctValue, count> >
        private HashMap<Integer, HashMap<String, Long>> _rcdMaps  = new 
HashMap<Integer, HashMap<String, Long>>();
+       private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
        
-       RecodeAgent(JSONObject parsedSpec) throws JSONException {
-               
-               int rcdCount = 0;
-               
-               if ( parsedSpec.containsKey(TX_METHOD.RECODE.toString())) 
+       public RecodeAgent(JSONObject parsedSpec)
+               throws JSONException 
+       {
+               super(null);
+               int rcdCount = 0;       
+               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE)) 
                {
                        //TODO consolidate external and internal json spec 
definitions
                        JSONArray attrs = null;
-                       if( parsedSpec.get(TX_METHOD.RECODE.toString()) 
instanceof JSONObject ) {
-                               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.RECODE.toString());
-                               attrs = (JSONArray) obj.get(JSON_ATTRS);
+                       if( parsedSpec.get(TfUtils.TXMETHOD_RECODE) instanceof 
JSONObject ) {
+                               JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_RECODE);
+                               attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS);
                        }
                        else
-                               attrs = 
(JSONArray)parsedSpec.get(TX_METHOD.RECODE.toString());
-                       
-                       _rcdList = new int[attrs.size()];
-                       for(int i=0; i < _rcdList.length; i++) 
-                               _rcdList[i] = UtilFunctions.toInt(attrs.get(i));
-                       rcdCount = _rcdList.length;
+                               attrs = 
(JSONArray)parsedSpec.get(TfUtils.TXMETHOD_RECODE);                     
+                       rcdCount = initColList(attrs);
                }
                
-               if ( parsedSpec.containsKey(TX_METHOD.MVRCD.toString())) 
+               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) 
                {
-                       JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.MVRCD.toString());
-                       JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+                       JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_MVRCD);
+                       JSONArray attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
                        
                        _mvrcdList = new int[attrs.size()];
                        for(int i=0; i < _mvrcdList.length; i++) 
@@ -97,9 +94,9 @@ public class RecodeAgent extends TransformationAgent {
                {
                        _fullrcdList = new int[rcdCount];
                        int idx = -1;
-                       if(_rcdList != null)
-                               for(int i=0; i < _rcdList.length; i++)
-                                       _fullrcdList[++idx] = _rcdList[i]; 
+                       if(_colList != null)
+                               for(int i=0; i < _colList.length; i++)
+                                       _fullrcdList[++idx] = _colList[i]; 
                        
                        if(_mvrcdList != null)
                                for(int i=0; i < _mvrcdList.length; i++)
@@ -114,8 +111,8 @@ public class RecodeAgent extends TransformationAgent {
         * @param frame
         */
        public void initRecodeMaps( FrameBlock frame ) {
-               for( int j=0; j<_rcdList.length; j++ ) {
-                       int colID = _rcdList[j]; //1-based
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
                        HashMap<String,Long> map = new HashMap<String,Long>();
                        for( int i=0; i<frame.getNumRows(); i++ ) {
                                String[] tmp = frame.get(i, 
colID-1).toString().split(Lop.DATATYPE_PREFIX);
@@ -125,8 +122,16 @@ public class RecodeAgent extends TransformationAgent {
                }
        }
        
+       public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { 
+               return _rcdMaps; 
+       }
+       
+       public HashMap<Integer, HashMap<String,String>> getRecodeMaps() {
+               return _finalMaps;
+       }
+       
        void prepare(String[] words, TfUtils agents) {
-               if ( _rcdList == null && _mvrcdList == null )
+               if ( _colList == null && _mvrcdList == null )
                        return;
                
                String w = null;
@@ -183,7 +188,7 @@ public class RecodeAgent extends TransformationAgent {
        }
        
        public void mapOutputHelper(int taskID, OutputCollector<IntWritable, 
DistinctValue> out, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils 
agents) throws IOException {
-               if ( _rcdList == null  && _mvrcdList == null )
+               if ( _colList == null  && _mvrcdList == null )
                        return;
                
                try 
@@ -241,7 +246,7 @@ public class RecodeAgent extends TransformationAgent {
                int rcdIndex = 0, modeIndex = 0;
                long maxCount = Long.MIN_VALUE;
                
-               boolean isRecoded = (isRecoded(colID) != -1);
+               boolean isRecoded = (isApplicable(colID) != -1);
                boolean isModeImputed = (mvagent.getMethod(colID) == 
MVMethod.GLOBAL_MODE);
                
                Path pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + 
TfUtils.TXMTD_RCD_MAP_SUFFIX);
@@ -293,7 +298,7 @@ public class RecodeAgent extends TransformationAgent {
                if ( isRecoded ) 
                {
                        // output mode
-                       pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) 
+ MODE_FILE_SUFFIX);
+                       pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) 
+ TfUtils.MODE_FILE_SUFFIX);
                        br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                        br.write(UtilFunctions.quote(mode) + "," + modeIndex + 
"," + maxCount );
                        br.close();
@@ -307,7 +312,7 @@ public class RecodeAgent extends TransformationAgent {
                
                if (isModeImputed) 
                {
-                       pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) 
+ MV_FILE_SUFFIX);
+                       pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) 
+ TfUtils.MV_FILE_SUFFIX);
                        br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
                        br.write(colID + "," + UtilFunctions.quote(mode));
                        br.close();
@@ -316,7 +321,7 @@ public class RecodeAgent extends TransformationAgent {
        }
        
        public void outputTransformationMetadata(String outputDir, FileSystem 
fs, TfUtils agents) throws IOException {
-               if(_rcdList == null && _mvrcdList == null )
+               if(_colList == null && _mvrcdList == null )
                        return;
                
                for(int i=0; i<_fullrcdList.length; i++) {
@@ -356,15 +361,6 @@ public class RecodeAgent extends TransformationAgent {
                writeMetadata(map, outputDir, colID, fs, agents, false);
        }
        
-       // 
------------------------------------------------------------------------------------------------
-       
-       public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { 
return _rcdMaps; }
-       
-       HashMap<Integer, HashMap<String,String>> _finalMaps = null;
-       public HashMap<Integer, HashMap<String,String>> getRecodeMaps() {
-               return _finalMaps;
-       }
-       
        /**
         * Method to load recode maps of all attributes, at once.
         * 
@@ -373,14 +369,14 @@ public class RecodeAgent extends TransformationAgent {
         */
        @Override
        public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if ( _rcdList == null )
+               if( !isApplicable() )
                        return;
                
                _finalMaps = new HashMap<Integer, HashMap<String, String>>();
        
                if(fs.isDirectory(txMtdDir)) {
-                       for(int i=0; i<_rcdList.length;i++) {
-                               int colID = _rcdList[i];
+                       for(int i=0; i<_colList.length;i++) {
+                               int colID = _colList[i];
                                
                                Path path = new Path( txMtdDir + "/Recode/" + 
agents.getName(colID) + TfUtils.TXMTD_RCD_MAP_SUFFIX);
                                TfUtils.checkValidInputFile(fs, path, true); 
@@ -404,22 +400,7 @@ public class RecodeAgent extends TransformationAgent {
                        fs.close();
                        throw new RuntimeException("Path to recode maps must be 
a directory: " + txMtdDir);
                }
-       }
-       
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isRecoded(int colID)
-       {
-               if(_rcdList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_rcdList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-
-       
+       }       
 
        /**
         * Method to apply transformations.
@@ -428,16 +409,17 @@ public class RecodeAgent extends TransformationAgent {
         * @return
         */
        @Override
-       public String[] apply(String[] words, TfUtils agents) {
-               if ( _rcdList == null )
+       public String[] apply(String[] words) 
+       {
+               if( !isApplicable() )
                        return words;
                
                //apply recode maps on relevant columns of given row
-               for(int i=0; i < _rcdList.length; i++) {
+               for(int i=0; i < _colList.length; i++) {
                        //prepare input and get code
-                       int colID = _rcdList[i];
+                       int colID = _colList[i];
                        String key = 
UtilFunctions.unquote(words[colID-1].trim());
-                       String val = _finalMaps.get(colID).get(key);            
        
+                       String val = lookupRCDMap(colID, key);                  
                        // replace unseen keys with NaN 
                        words[colID-1] = (val!=null) ? val : "NaN";
                }
@@ -447,43 +429,17 @@ public class RecodeAgent extends TransformationAgent {
        
        /**
         * 
-        * @param words
-        * @param agents
+        * @param colID
+        * @param key
         * @return
         */
-       public String[] cp_apply(String[] words, TfUtils agents) {
-               if ( _rcdList == null )
-                       return words;
-               
-               //apply recode maps on relevant columns of given row
-               for(int i=0; i < _rcdList.length; i++) {
-                       //prepare input and get code
-                       int colID = _rcdList[i];
-                       String key = 
UtilFunctions.unquote(words[colID-1].trim());
-                       Long val = _rcdMaps.get(colID).get(key);                
        
-                       // replace unseen keys with NaN
-                       words[colID-1] = (val!=null) ? Long.toString(val) : 
"NaN";
-               }
-                       
-               return words;
-       }
-       
-       public void printMaps() {
-               for(Integer k : _rcdMaps.keySet()) {
-                       System.out.println("Column " + k);
-                       HashMap<String,Long> map = _rcdMaps.get(k);
-                       for(String w : map.keySet()) {
-                               System.out.println("    " + w + " : " + 
map.get(w));
-                       }
-               }
-       }
-       
-       public void print() {
-               System.out.print("Recoding List: \n    ");
-               for(int i : _rcdList) {
-                       System.out.print(i + " ");
+       private String lookupRCDMap(int colID, String key) {
+               if( _finalMaps!=null )
+                       return _finalMaps.get(colID).get(key);
+               else { //used for cp
+                       Long tmp = _rcdMaps.get(colID).get(key);
+                       return (tmp!=null) ? Long.toString(tmp) : null;
                }
-               System.out.println();
        }
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index bc08a3a..16ccf6e 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -53,6 +53,36 @@ public class TfUtils implements Serializable{
        
        private static final long serialVersionUID = 526252850872633125L;
 
+       protected enum ColumnTypes { 
+               SCALE, 
+               NOMINAL, 
+               ORDINAL, 
+               DUMMYCODED, 
+               INVALID;
+       
+               protected byte toID() { 
+                       switch(this) {
+                               case SCALE: return 1;
+                               case NOMINAL: return 2;
+                               case ORDINAL: return 3;
+                               // Ideally, dummycoded columns should be of a 
different type. Treating them as SCALE is incorrect, semantically.
+                               case DUMMYCODED: return 1; 
+                               default:
+                                       throw new RuntimeException("Invalid 
Column Type: " + this);
+                       }
+               }       
+       }
+       
+       //transform methods
+       public static final String TXMETHOD_IMPUTE    = "impute";
+       public static final String TXMETHOD_RECODE    = "recode";
+       public static final String TXMETHOD_BIN       = "bin";
+       public static final String TXMETHOD_DUMMYCODE = "dummycode";
+       public static final String TXMETHOD_SCALE     = "scale";
+       public static final String TXMETHOD_OMIT      = "omit";
+       public static final String TXMETHOD_MVRCD     = "mvrcd";
+               
+       //transform meta data constants
        public static final String TXMTD_SEP         = ",";
        public static final String TXMTD_COLTYPES        = "coltypes.csv";      
        public static final String TXMTD_COLNAMES    = "column.names";
@@ -60,6 +90,17 @@ public class TfUtils implements Serializable{
        public static final String TXMTD_RCD_MAP_SUFFIX          = ".map";
        public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct";
        
+       public static final String JSON_ATTRS   = "attributes"; 
+       public static final String JSON_MTHD    = "methods"; 
+       public static final String JSON_CONSTS = "constants"; 
+       public static final String JSON_NBINS   = "numbins";            
+       protected static final String MV_FILE_SUFFIX            = ".impute";
+       protected static final String MODE_FILE_SUFFIX          = ".mode";
+       protected static final String BIN_FILE_SUFFIX           = ".bin";
+       protected static final String SCALE_FILE_SUFFIX         = ".scale";
+       protected static final String DCD_FILE_NAME             = 
"dummyCodeMaps.csv";  
+       protected static final String DCD_NAME_SEP      = "_";
+       
        
        private OmitAgent _oa = null;
        private MVImputeAgent _mia = null;
@@ -136,7 +177,7 @@ public class TfUtils implements Serializable{
        {
                //TODO recodemaps handover
                _numInputCols = inNcol;
-               createAgents(spec);
+               createAgents(spec, new String[]{});
        }
        
        protected static boolean checkValidInputFile(FileSystem fs, Path path, 
boolean err)
@@ -200,9 +241,11 @@ public class TfUtils implements Serializable{
                return 
parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
        }
        
-       private void createAgents(JSONObject spec) throws IOException, 
JSONException {
+       private void createAgents(JSONObject spec, String[] naStrings) 
+               throws IOException, JSONException 
+       {
                _oa = new OmitAgent(spec);
-               _mia = new MVImputeAgent(spec);
+               _mia = new MVImputeAgent(spec, naStrings);
                _ra = new RecodeAgent(spec);
                _ba = new BinAgent(spec);
                _da = new DummycodeAgent(spec, _numInputCols);
@@ -242,7 +285,7 @@ public class TfUtils implements Serializable{
                _outputPath = outputPath;
                
                parseColumnNames();             
-               createAgents(spec);
+               createAgents(spec, naStrings);
        }
        
        public void incrValid() { _numValidRecords++; }
@@ -282,25 +325,23 @@ public class TfUtils implements Serializable{
         * @param w
         * @return
         */
-       public boolean isNA(String w) {
-               if(_NAstrings == null)
+       public static boolean isNA(String[] NAstrings, String w) {
+               if(NAstrings == null)
                        return false;
                
-               for(String na : _NAstrings) {
+               for(String na : NAstrings) {
                        if(w.equals(na))
                                return true;
                }
                return false;
        }
        
-       public String[] getWords(Text line)
-       {
+       public String[] getWords(Text line) {
                return getWords(line.toString());
        }
        
 
-       public String[] getWords(String line) 
-       {
+       public String[] getWords(String line) {
                return getDelim().split(line.trim(), -1);
        }
        
@@ -315,10 +356,10 @@ public class TfUtils implements Serializable{
                String[] words = getWords(line);
                if(!getOmitAgent().omit(words, this))
                {
-                       getMVImputeAgent().prepare(words, this);
+                       getMVImputeAgent().prepare(words);
                        getRecodeAgent().prepare(words, this);
                        getBinAgent().prepare(words, this);
-                       incrValid();;
+                       incrValid();
                }
                incrTotal();
                
@@ -354,30 +395,10 @@ public class TfUtils implements Serializable{
                // associate recode maps and bin definitions with dummycoding 
agent,
                // as recoded and binned columns are typically dummycoded
                getDummycodeAgent().setRecodeMaps( 
getRecodeAgent().getRecodeMaps() );
-               getDummycodeAgent().setNumBins(getBinAgent().getBinList(), 
getBinAgent().getNumBins());
+               getDummycodeAgent().setNumBins(getBinAgent().getColList(), 
getBinAgent().getNumBins());
                getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
 
        }
-       
-       /*public void loadTfMetadata () throws IOException
-       {
-               Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
-               FileSystem localFS = FileSystem.getLocal(_rJob);
-               
-               loadTfMetadata(_rJob, localFS, tfMtdDir);
-               
-               FileSystem fs;
-               fs = FileSystem.get(_rJob);
-               Path thisPath=new 
Path(_rJob.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
-               String thisfile=thisPath.toString();
-                       
-               Path smallestFilePath=new 
Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-               if(thisfile.toString().equals(smallestFilePath.toString()))
-                       _partFileWithHeader=true;
-               else
-                       _partFileWithHeader = false;
-       }*/
-
 
        public String processHeaderLine() throws IOException 
        {
@@ -402,11 +423,6 @@ public class TfUtils implements Serializable{
                return getOmitAgent().omit(words, this);
        }
        
-       
-       public String[] apply(String[] words) {
-               return apply(words, false);
-       }
-       
        /**
         * Function to apply transformation metadata on a given row.
         * 
@@ -414,18 +430,11 @@ public class TfUtils implements Serializable{
         * @param optimizeMaps
         * @return
         */
-       public String[] apply ( String[] words, boolean optimizeMaps ) 
-       {
-               words = getMVImputeAgent().apply(words, this);
-               
-               if(optimizeMaps)
-                       // specific case of transform() invoked from CP (to 
save boxing and unboxing)
-                       words = getRecodeAgent().cp_apply(words, this);
-               else
-                       words = getRecodeAgent().apply(words, this);
-
-               words = getBinAgent().apply(words, this);
-               words = getDummycodeAgent().apply(words, this);         
+       public String[] apply( String[] words ) {
+               words = getMVImputeAgent().apply(words);
+               words = getRecodeAgent().apply(words);
+               words = getBinAgent().apply(words);
+               words = getDummycodeAgent().apply(words);               
                _numTransformedRows++;
                
                return words;
@@ -443,8 +452,7 @@ public class TfUtils implements Serializable{
                }
        }
        
-       public String checkAndPrepOutputString(String []words) throws 
DMLRuntimeException 
-       {
+       public String checkAndPrepOutputString(String []words) throws 
DMLRuntimeException {
                return checkAndPrepOutputString(words, new StringBuilder());
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
deleted file mode 100644
index 0cdf682..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-
-public abstract class TransformationAgent implements Serializable {
-       
-       private static final long serialVersionUID = -2995384194257356337L;
-       
-       public static enum TX_METHOD { 
-               IMPUTE ("impute"), 
-               RECODE ("recode"), 
-               BIN ("bin"), 
-               DUMMYCODE ("dummycode"), 
-               SCALE ("scale"),
-               OMIT ("omit"),
-               MVRCD ("mvrcd");
-               
-               private String _name;
-               
-               TX_METHOD(String name) { _name = name; }
-               
-               public String toString() {
-                       return _name;
-               }
-       }
-       
-       public static final String JSON_ATTRS   = "attributes"; 
-       public static final String JSON_MTHD    = "methods"; 
-       public static final String JSON_CONSTS = "constants"; 
-       public static final String JSON_NBINS   = "numbins";    
-       
-       protected static final String MV_FILE_SUFFIX            = ".impute";
-       protected static final String MODE_FILE_SUFFIX          = ".mode";
-       protected static final String BIN_FILE_SUFFIX           = ".bin";
-       protected static final String SCALE_FILE_SUFFIX         = ".scale";
-       protected static final String DCD_FILE_NAME             = 
"dummyCodeMaps.csv";
-       
-       protected static final String DCD_NAME_SEP      = "_";
-       
-       
-       abstract public void print();
-       abstract public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException;
-       abstract public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
-       
-       abstract public void loadTxMtd(JobConf job, FileSystem fs, Path 
txMtdDir, TfUtils agents) throws IOException;
-       abstract public String[] apply(String[] words, TfUtils agents);
-       
-       protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, 
INVALID }
-       protected byte columnTypeToID(ColumnTypes type) throws IOException { 
-               switch(type) 
-               {
-               case SCALE: return 1;
-               case NOMINAL: return 2;
-               case ORDINAL: return 3;
-               case DUMMYCODED: return 1; // Ideally, dummycoded columns 
should be of a different type. Treating them as SCALE is incorrect, 
semantically.
-               default:
-                       throw new IOException("Invalid Column Type: " + type);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
index 1999f10..f66ed5e 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
@@ -44,8 +44,9 @@ public abstract class Decoder
         * 
         * @param in
         * @param out
+        * @return
         */
-       public abstract void decode(double[] in, Object[] out);
+       public abstract Object[] decode(double[] in, Object[] out);
        
        /**
         * Block decode API converting a matrix block into a frame block.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
index 4ad11ed..b0b0909 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
@@ -47,9 +47,10 @@ public class DecoderComposite extends Decoder
        }
 
        @Override
-       public void decode(double[] in, Object[] out) {
+       public Object[] decode(double[] in, Object[] out) {
                for( Decoder decoder : _decoders )
                        decoder.decode(in, out);
+               return out;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
index e6560c6..cdf9a52 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
@@ -26,8 +26,7 @@ import java.util.List;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.transform.TransformationAgent;
-import org.apache.sysml.runtime.transform.TransformationAgent.TX_METHOD;
+import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONObject;
@@ -59,14 +58,14 @@ public class DecoderFactory
                        }
                        
                        //create decoder 'recode'
-                       if ( jSpec.containsKey(TX_METHOD.RECODE.toString()))  {
+                       if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE))  {
                                JSONArray attrs = null;
-                               if( jSpec.get(TX_METHOD.RECODE.toString()) 
instanceof JSONObject ) {
-                                       JSONObject obj = (JSONObject) 
jSpec.get(TX_METHOD.RECODE.toString());
-                                       attrs = (JSONArray) 
obj.get(TransformationAgent.JSON_ATTRS);
+                               if( jSpec.get(TfUtils.TXMETHOD_RECODE) 
instanceof JSONObject ) {
+                                       JSONObject obj = (JSONObject) 
jSpec.get(TfUtils.TXMETHOD_RECODE);
+                                       attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
                                }
                                else
-                                       attrs = 
(JSONArray)jSpec.get(TX_METHOD.RECODE.toString());
+                                       attrs = 
(JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE);
                                
                                int[] rcCols = new int[attrs.size()];
                                for(int j=0; j<rcCols.length; j++) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
index 0c9a872..f554f8b 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
@@ -61,11 +61,12 @@ public class DecoderRecode extends Decoder
        }
 
        @Override
-       public void decode(double[] in, Object[] out) {
+       public Object[] decode(double[] in, Object[] out) {
                for( int j=0; j<_rcCols.length; j++ ) {
                        long key = UtilFunctions.toLong(in[_rcCols[j]]);
                        out[_rcCols[j]] = _rcMaps[j].get(key);
                }
+               return out;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/d9e0748b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
new file mode 100644
index 0000000..32694c9
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.sysml.runtime.transform.DistinctValue;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONArray;
+
+/**
+ * Base class for all transform encoders providing both a row and block
+ * interface for decoding frames to matrices.
+ * 
+ */
+public abstract class Encoder implements Serializable
+{
+       private static final long serialVersionUID = 2299156350718979064L;
+       
+       protected int[] _colList = null;
+       
+       protected Encoder( int[] colList ) {
+               _colList = colList;
+       }
+       
+       public int[] getColList() {
+               return _colList;
+       }
+       
+       /**
+        * 
+        * @param attrs
+        */
+       public int initColList(JSONArray attrs) {
+               _colList = new int[attrs.size()];
+               for(int i=0; i < _colList.length; i++) 
+                       _colList[i] = UtilFunctions.toInt(attrs.get(i));        
+               return _colList.length;
+       }
+       
+       /**
+        * Indicates if this encoder is applicable, i.e, if there is at 
+        * least one column to encode. 
+        * 
+        * @return
+        */
+       public boolean isApplicable()  {
+               return (_colList != null && _colList.length > 0);
+       }
+       
+       /**
+        * Indicates if this encoder is applicable for the given column ID,
+        * i.e., if it is subject to this transformation.
+        * 
+        */
+       public int isApplicable(int colID) {
+               if(_colList == null)
+                       return -1;
+               int idx = Arrays.binarySearch(_colList, colID);
+               return ( idx >= 0 ? idx : -1);
+       }
+       
+       /**
+        * Encode input data according to existing transform meta
+        * data (transform apply).
+        * 
+        * @param in
+        * @return
+        */
+       public abstract String[] apply(String[] in);
+       
+       
+       //OLD API: kept for a transition phase only
+       //TODO stage 2: refactor data and meta data IO into minimal set of 
ultility functions
+       abstract public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException;
+       abstract public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
+       abstract public void loadTxMtd(JobConf job, FileSystem fs, Path 
txMtdDir, TfUtils agents) throws IOException;
+}


Reply via email to