http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 9e30f5c..7743b61 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -19,40 +19,13 @@
 
 package org.apache.sysml.runtime.transform;
 
-import java.io.EOFException;
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Arrays;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.Lop;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
-@SuppressWarnings("deprecation")
-public class TfUtils implements Serializable{
-       
+public class TfUtils implements Serializable
+{      
        private static final long serialVersionUID = 526252850872633125L;
 
        protected enum ColumnTypes { 
@@ -89,9 +62,7 @@ public class TfUtils implements Serializable{
        
        //transform meta data constants (old file-based transform)
        public static final String TXMTD_SEP         = ",";
-       public static final String TXMTD_COLTYPES    = "coltypes.csv";  
        public static final String TXMTD_COLNAMES    = "column.names";
-       public static final String TXMTD_DC_COLNAMES = 
"dummycoded.column.names";       
        public static final String TXMTD_RCD_MAP_SUFFIX      = ".map";
        public static final String TXMTD_RCD_DISTINCT_SUFFIX = ".ndistinct";
        public static final String TXMTD_BIN_FILE_SUFFIX     = ".bin";
@@ -101,184 +72,14 @@ public class TfUtils implements Serializable{
        public static final String JSON_MTHD    = "methods"; 
        public static final String JSON_CONSTS = "constants"; 
        public static final String JSON_NBINS   = "numbins";            
-       protected static final String MODE_FILE_SUFFIX          = ".mode";
-       protected static final String SCALE_FILE_SUFFIX         = ".scale";
-       protected static final String DCD_FILE_NAME             = 
"dummyCodeMaps.csv";  
-       protected static final String DCD_NAME_SEP      = "_";
-       
-       
-       private OmitAgent _oa = null;
-       private MVImputeAgent _mia = null;
-       private RecodeAgent _ra = null; 
-       private BinAgent _ba = null;
-       private DummycodeAgent _da = null;
-       
-       private long _numRecordsInPartFile;             // Total number of 
records in the data file
-       private long _numValidRecords;                  // 
(_numRecordsInPartFile - #of omitted records)
-       private long _numTransformedRows;               // Number of rows after 
applying transformations
-       private long _numTransformedColumns;    // Number of columns after 
applying transformations
 
        private String _headerLine = null;
        private boolean _hasHeader;
        private Pattern _delim = null;
        private String _delimString = null;
        private String[] _NAstrings = null;
-       private String[] _outputColumnNames = null;
        private int _numInputCols = -1;
        
-       private String _tfMtdDir = null;
-       private String _spec = null;
-       private String _offsetFile = null;
-       private String _tmpDir = null;
-       private String _outputPath = null;
-       
-       public TfUtils(JobConf job, boolean minimal) 
-               throws IOException, JSONException 
-       {
-               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                       ConfigurationManager.setCachedJobConf(job);
-               }               
-               _NAstrings = TfUtils.parseNAStrings(job);
-               _spec = job.get(MRJobConfiguration.TF_SPEC);
-               _oa = new OmitAgent(new JSONObject(_spec), null, -1);
-       }
-       
-       // called from GenTFMtdMapper, ApplyTf (Hadoop)
-       public TfUtils(JobConf job) 
-               throws IOException, JSONException 
-       {
-               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                       ConfigurationManager.setCachedJobConf(job);
-               }
-               
-               boolean hasHeader = 
Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
-               String[] naStrings = TfUtils.parseNAStrings(job);
-               long numCols = UtilFunctions.parseToLong( 
job.get(MRJobConfiguration.TF_NUM_COLS) ); // #cols input data
-               String spec = job.get(MRJobConfiguration.TF_SPEC);
-               String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
-               String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
-               String outputPath = 
FileOutputFormat.getOutputPath(job).toString();
-               JSONObject jspec = new JSONObject(spec);
-               
-               init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, 
job.get(MRJobConfiguration.TF_DELIM), naStrings, jspec, numCols, offsetFile, 
tmpPath, outputPath);
-       }
-       
-       // called from GenTfMtdReducer 
-       public TfUtils(JobConf job, String tfMtdDir) throws IOException, 
JSONException 
-       {
-               this(job);
-               _tfMtdDir = tfMtdDir;
-       }
-       
-       // called from GenTFMtdReducer and ApplyTf (Spark)
-       public TfUtils(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String 
offsetFile, String tmpPath) throws IOException, JSONException {
-               init (headerLine, hasHeader, delim, naStrings, spec, ncol, 
offsetFile, tmpPath, null);
-               _tfMtdDir = tfMtdDir;
-       }
-
-       protected static boolean checkValidInputFile(FileSystem fs, Path path, 
boolean err)
-                       throws IOException {
-               // check non-existing file
-               if (!fs.exists(path))
-                       if ( err )
-                               throw new IOException("File " + path.toString() 
+ " does not exist on HDFS/LFS.");
-                       else
-                               return false;
-
-               // check for empty file
-               if( MapReduceTool.isFileEmpty(fs, path) )
-                       if ( err )
-                               throw new EOFException("Empty input file " + 
path.toString() + ".");
-                       else
-                               return false;
-               
-               return true;
-       }
-       
-       public static String getPartFileName(JobConf job) throws IOException {
-               Path path = new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               path = path.makeQualified(fs);
-               return path.toString();
-       }
-       
-       public static boolean isPartFileWithHeader(JobConf job) throws 
IOException {
-               String thisfile=getPartFileName(job);
-               Path path = new 
Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE));
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               path = path.makeQualified(fs);
-               return thisfile.toString().equals(path.toString());
-       }
-       
-       /**
-        * Prepare NA strings so that they can be sent to workers via JobConf.
-        * A "dummy" string is added at the end to handle the case of empty 
strings.
-        * @param na NA string
-        * @return NA string concatenated with NA string separator concatenated 
with "dummy"
-        */
-       public static String prepNAStrings(String na) {
-               return na  + DataExpression.DELIM_NA_STRING_SEP + "dummy";
-       }
-       
-       public static String[] parseNAStrings(String na) 
-       {
-               if ( na == null )
-                       return null;
-               
-               String[] tmp = 
Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, 
-1);
-               return tmp; //Arrays.copyOf(tmp, tmp.length-1);
-       }
-       
-       public static String[] parseNAStrings(JobConf job) 
-       {
-               return 
parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
-       }
-       
-       private void createAgents(JSONObject spec, String[] naStrings) 
-               throws IOException, JSONException 
-       {
-               _oa = new OmitAgent(spec, _outputColumnNames, _numInputCols);
-               _mia = new MVImputeAgent(spec, null, naStrings, _numInputCols);
-               _ra = new RecodeAgent(spec, _outputColumnNames, _numInputCols);
-               _ba = new BinAgent(spec, _outputColumnNames, _numInputCols);
-               _da = new DummycodeAgent(spec, _outputColumnNames, 
_numInputCols);
-       }
-       
-       private void parseColumnNames() {
-               _outputColumnNames = _delim.split(_headerLine, -1);
-               for(int i=0; i < _outputColumnNames.length; i++)
-                       _outputColumnNames[i] = 
UtilFunctions.unquote(_outputColumnNames[i]);
-       }
-       
-       private void init(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long numCols, String offsetFile, String 
tmpPath, String outputPath) throws IOException, JSONException
-       {
-               _numRecordsInPartFile = 0;
-               _numValidRecords = 0;
-               _numTransformedRows = 0;
-               _numTransformedColumns = 0;
-               
-               //TODO: fix hard-wired header propagation to meta data column 
names
-               
-               _headerLine = headerLine;
-               _hasHeader = hasHeader;
-               _delimString = delim;
-               _delim = Pattern.compile(Pattern.quote(delim));
-               _NAstrings = naStrings;
-               _numInputCols = (int)numCols;
-               _offsetFile = offsetFile;
-               _tmpDir = tmpPath;
-               _outputPath = outputPath;
-               
-               parseColumnNames();             
-               createAgents(spec, naStrings);
-       }
-       
-       public void incrValid() { _numValidRecords++; }
-       public long getValid()  { return _numValidRecords; }
-       public long getTotal()  { return _numRecordsInPartFile; }
-       public long getNumTransformedRows()     { return _numTransformedRows; }
-       public long getNumTransformedColumns()  { return 
_numTransformedColumns; }
-       
        public String getHeader()               { return _headerLine; }
        public boolean hasHeader()              { return _hasHeader; }
        public String getDelimString()  { return _delimString; }
@@ -286,24 +87,6 @@ public class TfUtils implements Serializable{
        public String[] getNAStrings()  { return _NAstrings; }
        public long getNumCols()                { return _numInputCols; }
        
-       public String getSpec()         { return _spec; }
-       public String getTfMtdDir()     { return _tfMtdDir; }
-       public String getOffsetFile()   { return _offsetFile; }
-       public String getTmpDir()               { return _tmpDir; }
-       public String getOutputPath()   { return _outputPath; }
-       
-       public String getName(int colID) { return _outputColumnNames[colID-1]; }
-       
-       public void setValid(long n) { _numValidRecords = n;}
-       public void incrTotal() { _numRecordsInPartFile++; }
-       public void setTotal(long n) { _numRecordsInPartFile = n;}
-       
-       public OmitAgent          getOmitAgent()        {       return _oa; }
-       public MVImputeAgent  getMVImputeAgent(){       return _mia;}
-       public RecodeAgent        getRecodeAgent()      {       return _ra; }
-       public BinAgent           getBinAgent()         {       return _ba; }
-       public DummycodeAgent getDummycodeAgent() { return _da; }
-       
        /**
         * Function that checks if the given string is one of NA strings.
         * 
@@ -321,229 +104,4 @@ public class TfUtils implements Serializable{
                }
                return false;
        }
-       
-       public String[] getWords(Text line) {
-               return getWords(line.toString());
-       }
-       
-
-       public String[] getWords(String line) {
-               return getDelim().split(line.trim(), -1);
-       }
-       
-       /**
-        * Process a given row to construct transformation metadata.
-        * 
-        * @param line string to break into words
-        * @return string array of words from the line
-        * @throws IOException if IOException occurs
-        */
-       public String[] prepareTfMtd(String line) throws IOException {
-               String[] words = getWords(line);
-               if(!getOmitAgent().omit(words, this))
-               {
-                       getMVImputeAgent().prepare(words);
-                       getRecodeAgent().prepare(words, this);
-                       getBinAgent().prepare(words, this);
-                       incrValid();
-               }
-               incrTotal();
-               
-               return words;
-       }
-       
-       public void loadTfMetadata() throws IOException 
-       {
-               JobConf job = ConfigurationManager.getCachedJobConf();
-               loadTfMetadata(job, false);
-       }
-       
-       public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws 
IOException
-       {
-               Path tfMtdDir = null; 
-               FileSystem fs = null;
-               
-               if(fromLocalFS) {
-                       // metadata must be read from local file system (e.g., 
distributed cache in the case of Hadoop)
-                       tfMtdDir = 
(DistributedCache.getLocalCacheFiles(job))[0];
-                       fs = FileSystem.getLocal(job);
-               }
-               else {
-                       tfMtdDir = new Path(getTfMtdDir());
-                       fs = IOUtilFunctions.getFileSystem(tfMtdDir, job);
-               }
-               
-               // load transformation metadata 
-               getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-               getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-               getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
-               
-               // associate recode maps and bin definitions with dummycoding 
agent,
-               // as recoded and binned columns are typically dummycoded
-               getDummycodeAgent().setRecodeMaps( 
getRecodeAgent().getRecodeMaps() );
-               getDummycodeAgent().setNumBins(getBinAgent().getColList(), 
getBinAgent().getNumBins());
-               getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
-       }
-
-       public String processHeaderLine() throws IOException 
-       {
-               //TODO: fix hard-wired header propagation to meta data column 
names
-               
-               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-               String dcdHeader = 
getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
-               getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), 
(int) getNumCols(), this);
-               
-               // write header information (before and after transformation) 
to temporary path
-               // these files are copied into txMtdPath, once the ApplyTf job 
is complete.
-               DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), 
dcdHeader);
-
-               return dcdHeader;
-               //_numTransformedColumns = getDelim().split(dcdHeader, 
-1).length; 
-               //return _numTransformedColumns;
-       }
-
-       public boolean omit(String[] words) {
-               if(getOmitAgent() == null)
-                       return false;
-               return getOmitAgent().omit(words, this);
-       }
-       
-       /**
-        * Function to apply transformation metadata on a given row.
-        * 
-        * @param words string array of words
-        * @return string array of transformed words
-        */
-       public String[] apply( String[] words ) {
-               words = getMVImputeAgent().apply(words);
-               words = getRecodeAgent().apply(words);
-               words = getBinAgent().apply(words);
-               words = getDummycodeAgent().apply(words);               
-               _numTransformedRows++;
-               
-               return words;
-       }
-       
-       public void check(String []words) throws DMLRuntimeException 
-       {
-               boolean checkEmptyString = ( getNAStrings() != null );
-               if ( checkEmptyString ) 
-               {
-                       final String msg = "When na.strings are provided, empty 
string \"\" is considered as a missing value, and it must be imputed 
appropriately. Encountered an unhandled empty string in column ID: ";
-                       for(int i=0; i<words.length; i++) 
-                               if ( words[i] != null && words[i].equals(""))
-                                       throw new DMLRuntimeException(msg + 
getDummycodeAgent().mapDcdColumnID(i+1));
-               }
-       }
-       
-       public String checkAndPrepOutputString(String []words) throws 
DMLRuntimeException {
-               return checkAndPrepOutputString(words, new StringBuilder());
-       }
-       
-       public String checkAndPrepOutputString(String []words, StringBuilder 
sb) throws DMLRuntimeException 
-       {
-               /*
-                * Check if empty strings ("") have to be handled.
-                * 
-                * Unless na.strings are provided, empty strings are 
(implicitly) considered as value zero.
-                * When na.strings are provided, then "" is considered a 
missing value indicator, and the 
-                * user is expected to provide an appropriate imputation 
method. Therefore, when na.strings 
-                * are provided, "" encountered in any column (after all 
transformations are applied) 
-                * denotes an erroneous condition.  
-                */
-               boolean checkEmptyString = ( getNAStrings() != null ); //&& 
!MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
-               
-               //StringBuilder sb = new StringBuilder();
-               sb.setLength(0);
-               int i =0;
-               
-               if ( checkEmptyString ) 
-               {
-                       final String msg = "When na.strings are provided, empty 
string \"\" is considered as a missing value, and it must be imputed 
appropriately. Encountered an unhandled empty string in column ID: ";
-                       if ( words[0] != null ) 
-                               if ( words[0].equals("") )
-                                       throw new DMLRuntimeException( msg + 
getDummycodeAgent().mapDcdColumnID(1));
-                               else 
-                                       sb.append(words[0]);
-                       else
-                               sb.append("0");
-                       
-                       for(i=1; i<words.length; i++) 
-                       {
-                               sb.append(_delimString);
-                               
-                               if ( words[i] != null ) 
-                                       if ( words[i].equals("") )
-                                               throw new 
DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
-                                       else 
-                                               sb.append(words[i]);
-                               else
-                                       sb.append("0");
-                       }
-               }
-               else 
-               {
-                       sb.append(words[0] != null ? words[0] : "0");
-                       for(i=1; i<words.length; i++) 
-                       {
-                               sb.append(_delimString);
-                               sb.append(words[i] != null ? words[i] : "0");
-                       }
-               }
-               
-               return sb.toString();
-       }
-
-       private Reader initOffsetsReader(JobConf job) throws IOException 
-       {
-               Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
-               if ( files.length != 1 )
-                       throw new IOException("Expecting a single file under 
counters file: " + path.toString());
-               
-               Reader reader = new SequenceFile.Reader(fs, files[0], job);
-               
-               return reader;
-       }
-       
-       /**
-        * Function to generate custom file names (transform-part-.....) for
-        * mappers' output for ApplyTfCSV job. The idea is to find the index 
-        * of (thisfile, fileoffset) in the list of all offsets from the 
-        * counters/offsets file, which was generated from either GenTfMtdMR
-        * or AssignRowIDMR job.
-        * 
-        * @param job job configuration
-        * @param offset file offset
-        * @return part file id (ie, 00001, 00002, etc)
-        * @throws IOException if IOException occurs
-        */
-       public String getPartFileID(JobConf job, long offset) throws IOException
-       {
-               Reader reader = null;
-               int id = 0;
-               try {
-                       reader = initOffsetsReader(job);
-                       ByteWritable key=new ByteWritable();
-                       OffsetCount value=new OffsetCount();
-                       String thisFile = TfUtils.getPartFileName(job);
-                       while (reader.next(key, value)) {
-                               if ( thisFile.equals(value.filename) && 
value.fileOffset == offset ) 
-                                       break;
-                               id++;
-                       }
-               }
-               finally {
-                       IOUtilFunctions.closeSilently(reader);
-               }
-               
-               String sid = Integer.toString(id);
-               char[] carr = new char[5-sid.length()];
-               Arrays.fill(carr, '0');
-               String ret = (new String(carr)).concat(sid);
-               
-               return ret;
-       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
index a3f01a1..304dcdb 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -19,20 +19,10 @@
 
 package org.apache.sysml.runtime.transform.encode;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.wink.json4j.JSONArray;
 
@@ -152,11 +142,4 @@ public abstract class Encoder implements Serializable
         * @param meta frame block
         */
        public abstract void initMetaData(FrameBlock meta);
-       
-       
-       //OLD API: kept for a transition phase only
-       //TODO stage 2: refactor data and meta data IO into minimal set of 
ultility functions
-       abstract public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException;
-       abstract public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
-       abstract public void loadTxMtd(JobConf job, FileSystem fs, Path 
txMtdDir, TfUtils agents) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
new file mode 100644
index 0000000..fbe6994
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderBin.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderBin extends Encoder 
+{      
+       private static final long serialVersionUID = 1917445005206076078L;
+
+       public static final String MIN_PREFIX = "min";
+       public static final String MAX_PREFIX = "max";
+       public static final String NBINS_PREFIX = "nbins";
+
+       private int[] _numBins = null;
+       private double[] _min=null, _max=null;  // min and max among 
non-missing values
+       private double[] _binWidths = null;             // width of a bin for 
each attribute
+       
+       //frame transform-apply attributes
+       private double[][] _binMins = null;
+       private double[][] _binMaxs = null;
+       
+       public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen) 
+               throws JSONException, IOException 
+       {
+               this(parsedSpec, colnames, clen, false);
+       }
+
+       public EncoderBin(JSONObject parsedSpec, String[] colnames, int clen, 
boolean colsOnly) 
+               throws JSONException, IOException 
+       {
+               super( null, clen );            
+               if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
+                       return;
+               
+               if( colsOnly ) {
+                       List<Integer> collist = 
TfMetaUtils.parseBinningColIDs(parsedSpec, colnames);
+                       initColList(ArrayUtils.toPrimitive(collist.toArray(new 
Integer[0])));
+               }
+               else 
+               {
+                       JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_BIN);             
+                       JSONArray attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
+                       JSONArray nbins = (JSONArray) 
obj.get(TfUtils.JSON_NBINS);
+                       initColList(attrs);
+                       
+                       _numBins = new int[attrs.size()];
+                       for(int i=0; i < _numBins.length; i++)
+                               _numBins[i] = 
UtilFunctions.toInt(nbins.get(i)); 
+                       
+                       // initialize internal transformation metadata
+                       _min = new double[_colList.length];
+                       Arrays.fill(_min, Double.MAX_VALUE);
+                       _max = new double[_colList.length];
+                       Arrays.fill(_max, -Double.MAX_VALUE);
+                       
+                       _binWidths = new double[_colList.length];
+               }
+       }
+
+       public void prepare(String[] words, TfUtils agents) {
+               if ( !isApplicable() )
+                       return;
+               
+               for(int i=0; i <_colList.length; i++) {
+                       int colID = _colList[i];
+                       
+                       String w = null;
+                       double d = 0;
+                               
+                       // equi-width
+                       w = UtilFunctions.unquote(words[colID-1].trim());
+                       if(!TfUtils.isNA(agents.getNAStrings(),w)) {
+                               d = UtilFunctions.parseToDouble(w);
+                               if(d < _min[i])
+                                       _min[i] = d;
+                               if(d > _max[i])
+                                       _max[i] = d;
+                       }
+               }
+       }
+               
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               build(in);
+               return apply(in, out);
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               // nothing to do
+       }
+       
+       /**
+        * Method to apply transformations.
+        */
+       @Override
+       public String[] apply(String[] words) {
+               if( !isApplicable() )
+                       return words;
+       
+               for(int i=0; i < _colList.length; i++) {
+                       int colID = _colList[i];
+                       try {
+                               double val = 
UtilFunctions.parseToDouble(words[colID-1]);
+                               int binid = 1;
+                               double tmp = _min[i] + _binWidths[i];
+                               while(val > tmp && binid < _numBins[i]) {
+                                       tmp += _binWidths[i];
+                                       binid++;
+                               }
+                               words[colID-1] = Integer.toString(binid);
+                       } 
+                       catch(NumberFormatException e) {
+                               throw new RuntimeException("Encountered \"" + 
words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric 
value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an 
appropriate imputation method.");
+                       }
+               }
+               
+               return words;
+       }
+
+       @Override
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+               for(int j=0; j<_colList.length; j++) {
+                       int colID = _colList[j];
+                       for( int i=0; i<in.getNumRows(); i++ ) {
+                               double inVal = UtilFunctions.objectToDouble(
+                                               in.getSchema()[colID-1], 
in.get(i, colID-1));
+                               int ix = Arrays.binarySearch(_binMaxs[j], 
inVal);
+                               int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 
1;               
+                               out.quickSetValue(i, colID-1, binID);
+                       }       
+               }
+               return out;
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock meta) {
+               return meta;
+       }
+       
+       @Override
+       public void initMetaData(FrameBlock meta) {
+               _binMins = new double[_colList.length][];
+               _binMaxs = new double[_colList.length][];
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       int nbins = 
(int)meta.getColumnMetadata()[colID-1].getNumDistinct();
+                       _binMins[j] = new double[nbins];
+                       _binMaxs[j] = new double[nbins];
+                       for( int i=0; i<nbins; i++ ) {
+                               String[] tmp = meta.get(i, 
colID-1).toString().split(Lop.DATATYPE_PREFIX);
+                               _binMins[j][i] = Double.parseDouble(tmp[0]);
+                               _binMaxs[j][i] = Double.parseDouble(tmp[1]);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index 9efbc19..deff887 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -19,20 +19,11 @@
 
 package org.apache.sysml.runtime.transform.encode;
 
-import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
 
 /**
  * Simple composite encoder that applies a list of encoders 
@@ -90,7 +81,6 @@ public class EncoderComposite extends Encoder
                        encoder.build(in);
        }
 
-
        @Override
        public String[] apply(String[] in) {
                for( Encoder encoder : _encoders )
@@ -119,19 +109,4 @@ public class EncoderComposite extends Encoder
                for( Encoder encoder : _encoders )
                        encoder.initMetaData(out);
        }
-
-       @Override
-       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               throw new RuntimeException("File-based api not supported.");
-       }
-
-       @Override
-       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-               throw new RuntimeException("File-based api not supported.");    
-       }
-
-       @Override
-       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               throw new RuntimeException("File-based api not supported.");
-       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
new file mode 100644
index 0000000..743381a
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderDummycode.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderDummycode extends Encoder 
+{              
+       private static final long serialVersionUID = 5832130477659116489L;
+
+       private int[] _domainSizes = null;                      // length = #of 
dummycoded columns
+       private long _dummycodedLength = 0;                     // #of columns 
after dummycoded
+
+       public EncoderDummycode(JSONObject parsedSpec, String[] colnames, int 
clen) throws JSONException {
+               super(null, clen);
+               
+               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
+                       int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
colnames, TfUtils.TXMETHOD_DUMMYCODE);
+                       initColList(collist);
+               }
+       }
+       
+       @Override
+       public int getNumCols() {
+               return (int)_dummycodedLength;
+       }
+       
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               return apply(in, out);
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               //do nothing
+       }
+       
+       /**
+        * Method to apply transformations.
+        * 
+        * @param words array of strings
+        * @return array of transformed strings
+        */
+       @Override
+       public String[] apply(String[] words) 
+       {
+               if( !isApplicable() )
+                       return words;
+               
+               String[] nwords = new String[(int)_dummycodedLength];
+               int rcdVal = 0;
+               
+               for(int colID=1, idx=0, ncolID=1; colID <= words.length; 
colID++) {
+                       if(idx < _colList.length && colID==_colList[idx]) {
+                               // dummycoded columns
+                               try {
+                                       rcdVal = 
UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+                                       nwords[ ncolID-1+rcdVal-1 ] = "1";
+                                       ncolID += _domainSizes[idx];
+                                       idx++;
+                               } 
+                               catch (Exception e) {
+                                       throw new RuntimeException("Error in 
dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] 
+                                                       + ", domainSize=" + 
_domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
+                               }
+                       }
+                       else {
+                               nwords[ncolID-1] = words[colID-1];
+                               ncolID++;
+                       }
+               }
+               
+               return nwords;
+       }
+       
+       @Override
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
+       {
+               MatrixBlock ret = new MatrixBlock(out.getNumRows(), 
(int)_dummycodedLength, false);
+               
+               for( int i=0; i<out.getNumRows(); i++ ) {
+                       for(int colID=1, idx=0, ncolID=1; colID <= 
out.getNumColumns(); colID++) {
+                               double val = out.quickGetValue(i, colID-1);
+                               if(idx < _colList.length && 
colID==_colList[idx]) {
+                                       ret.quickSetValue(i, 
ncolID-1+(int)val-1, 1);
+                                       ncolID += _domainSizes[idx];
+                                       idx++;
+                               }
+                               else {
+                                       double ptval = 
UtilFunctions.objectToDouble(in.getSchema()[colID-1], in.get(i, colID-1));
+                                       ret.quickSetValue(i, ncolID-1, ptval);
+                                       ncolID++;
+                               }
+                       }
+               }
+               
+               return ret;
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               return out;
+       }
+       
+       @Override
+       public void initMetaData(FrameBlock meta) {
+               //initialize domain sizes and output num columns
+               _domainSizes = new int[_colList.length];
+               _dummycodedLength = _clen;
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       _domainSizes[j] = 
(int)meta.getColumnMetadata()[colID-1].getNumDistinct();
+                       _dummycodedLength +=  _domainSizes[j]-1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index f7ceefd..13b2810 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -28,11 +28,6 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.transform.BinAgent;
-import org.apache.sysml.runtime.transform.DummycodeAgent;
-import org.apache.sysml.runtime.transform.MVImputeAgent;
-import org.apache.sysml.runtime.transform.OmitAgent;
-import org.apache.sysml.runtime.transform.RecodeAgent;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -40,7 +35,6 @@ import org.apache.wink.json4j.JSONObject;
 
 public class EncoderFactory 
 {
-
        public static Encoder createEncoder(String spec, String[] colnames, int 
clen, FrameBlock meta) throws DMLRuntimeException {
                return createEncoder(spec, colnames, 
UtilFunctions.nCopies(clen, ValueType.STRING), meta);
        }
@@ -79,7 +73,7 @@ public class EncoderFactory
                        
                        //create individual encoders
                        if( !rcIDs.isEmpty() ) {
-                               RecodeAgent ra = new RecodeAgent(jSpec, 
colnames, clen);
+                               EncoderRecode ra = new EncoderRecode(jSpec, 
colnames, clen);
                                
ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0])));
                                lencoders.add(ra);      
                        }
@@ -87,13 +81,13 @@ public class EncoderFactory
                                lencoders.add(new EncoderPassThrough(
                                                
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));  
                        if( !dcIDs.isEmpty() )
-                               lencoders.add(new DummycodeAgent(jSpec, 
colnames, schema.length));
+                               lencoders.add(new EncoderDummycode(jSpec, 
colnames, schema.length));
                        if( !binIDs.isEmpty() )
-                               lencoders.add(new BinAgent(jSpec, colnames, 
schema.length, true));
+                               lencoders.add(new EncoderBin(jSpec, colnames, 
schema.length, true));
                        if( !oIDs.isEmpty() )
-                               lencoders.add(new OmitAgent(jSpec, colnames, 
schema.length));
+                               lencoders.add(new EncoderOmit(jSpec, colnames, 
schema.length));
                        if( !mvIDs.isEmpty() ) {
-                               MVImputeAgent ma = new MVImputeAgent(jSpec, 
colnames, schema.length);
+                               EncoderMVImpute ma = new EncoderMVImpute(jSpec, 
colnames, schema.length);
                                ma.initRecodeIDList(rcIDs);
                                lencoders.add(ma);
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
new file mode 100644
index 0000000..55a0bde
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderMVImpute.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.functionobjects.CM;
+import org.apache.sysml.runtime.functionobjects.Mean;
+import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderMVImpute extends Encoder 
+{      
+       private static final long serialVersionUID = 9057868620144662194L;
+
+       public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
+       
+       private MVMethod[] _mvMethodList = null;
+       private MVMethod[] _mvscMethodList = null;      // scaling methods for 
attributes that are imputed and also scaled
+       
+       private BitSet _isMVScaled = null;
+       private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE); 
        // function object that understands variance computation
+       
+       // objects required to compute mean and variance of all non-missing 
entries 
+       private Mean _meanFn = Mean.getMeanFnObject();  // function object that 
understands mean computation
+       private KahanObject[] _meanList = null;                 // column-level 
means, computed so far
+       private long[] _countList = null;                               // #of 
non-missing values
+       
+       private CM_COV_Object[] _varList = null;                // column-level 
variances, computed so far (for scaling)
+
+       private int[]                   _scnomvList = null;                     
// List of attributes that are scaled but not imputed
+       private MVMethod[]              _scnomvMethodList = null;       // 
scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
+       private KahanObject[]   _scnomvMeanList = null;         // column-level 
means, for attributes scaled but not imputed
+       private long[]                  _scnomvCountList = null;        // #of 
non-missing values, for attributes scaled but not imputed
+       private CM_COV_Object[] _scnomvVarList = null;          // column-level 
variances, computed so far
+       
+       private String[] _replacementList = null;               // 
replacements: for global_mean, mean; and for global_mode, recode id of mode 
category
+       private String[] _NAstrings = null;
+       private List<Integer> _rcList = null; 
+       private HashMap<Integer,HashMap<String,Long>> _hist = null;
+       
+       public String[] getReplacements() { return _replacementList; }
+       public KahanObject[] getMeans()   { return _meanList; }
+       public CM_COV_Object[] getVars()  { return _varList; }
+       public KahanObject[] getMeans_scnomv()   { return _scnomvMeanList; }
+       public CM_COV_Object[] getVars_scnomv()  { return _scnomvVarList; }
+       
+       public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, int 
clen) 
+               throws JSONException
+       {
+               super(null, clen);
+               
+               //handle column list
+               int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, 
colnames, TfUtils.TXMETHOD_IMPUTE);
+               initColList(collist);
+       
+               //handle method list
+               parseMethodsAndReplacments(parsedSpec);
+               
+               //create reuse histograms
+               _hist = new HashMap<Integer, HashMap<String,Long>>();
+       }
+                       
+       public EncoderMVImpute(JSONObject parsedSpec, String[] colnames, 
String[] NAstrings, int clen)
+               throws JSONException 
+       {
+               super(null, clen);      
+               boolean isMV = parsedSpec.containsKey(TfUtils.TXMETHOD_IMPUTE);
+               boolean isSC = parsedSpec.containsKey(TfUtils.TXMETHOD_SCALE);  
        
+               _NAstrings = NAstrings;
+               
+               if(!isMV) {
+                       // MV Impute is not applicable
+                       _colList = null;
+                       _mvMethodList = null;
+                       _meanList = null;
+                       _countList = null;
+                       _replacementList = null;
+               }
+               else {
+                       JSONObject mvobj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+                       JSONArray mvattrs = (JSONArray) 
mvobj.get(TfUtils.JSON_ATTRS);
+                       JSONArray mvmthds = (JSONArray) 
mvobj.get(TfUtils.JSON_MTHD);
+                       int mvLength = mvattrs.size();
+                       
+                       _colList = new int[mvLength];
+                       _mvMethodList = new MVMethod[mvLength];
+                       
+                       _meanList = new KahanObject[mvLength];
+                       _countList = new long[mvLength];
+                       _varList = new CM_COV_Object[mvLength];
+                       
+                       _isMVScaled = new BitSet(_colList.length);
+                       _isMVScaled.clear();
+                       
+                       for(int i=0; i < _colList.length; i++) {
+                               _colList[i] = 
UtilFunctions.toInt(mvattrs.get(i));
+                               _mvMethodList[i] = 
MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))]; 
+                               _meanList[i] = new KahanObject(0, 0);
+                       }
+                       
+                       _replacementList = new String[mvLength];        // 
contains replacements for all columns (scale and categorical)
+                       
+                       JSONArray constants = 
(JSONArray)mvobj.get(TfUtils.JSON_CONSTS);
+                       for(int i=0; i < constants.size(); i++) {
+                               if ( constants.get(i) == null )
+                                       _replacementList[i] = "NaN";
+                               else
+                                       _replacementList[i] = 
constants.get(i).toString();
+                       }
+               }
+               
+               // Handle scaled attributes
+               if ( !isSC )
+               {
+                       // scaling is not applicable
+                       _scnomvCountList = null;
+                       _scnomvMeanList = null;
+                       _scnomvVarList = null;
+               }
+               else
+               {
+                       if ( _colList != null ) 
+                               _mvscMethodList = new MVMethod[_colList.length];
+                       
+                       JSONObject scobj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_SCALE);
+                       JSONArray scattrs = (JSONArray) 
scobj.get(TfUtils.JSON_ATTRS);
+                       JSONArray scmthds = (JSONArray) 
scobj.get(TfUtils.JSON_MTHD);
+                       int scLength = scattrs.size();
+                       
+                       int[] _allscaled = new int[scLength];
+                       int scnomv = 0, colID;
+                       byte mthd;
+                       for(int i=0; i < scLength; i++)
+                       {
+                               colID = UtilFunctions.toInt(scattrs.get(i));
+                               mthd = (byte) 
UtilFunctions.toInt(scmthds.get(i)); 
+                                               
+                               _allscaled[i] = colID;
+                               
+                               // check if the attribute is also MV imputed
+                               int mvidx = isApplicable(colID);
+                               if(mvidx != -1)
+                               {
+                                       _isMVScaled.set(mvidx);
+                                       _mvscMethodList[mvidx] = 
MVMethod.values()[mthd];
+                                       _varList[mvidx] = new CM_COV_Object();
+                               }
+                               else
+                                       scnomv++;       // count of scaled but 
not imputed 
+                       }
+                       
+                       if(scnomv > 0)
+                       {
+                               _scnomvList = new int[scnomv];                  
+                               _scnomvMethodList = new MVMethod[scnomv];       
+       
+                               _scnomvMeanList = new KahanObject[scnomv];
+                               _scnomvCountList = new long[scnomv];
+                               _scnomvVarList = new CM_COV_Object[scnomv];
+                               
+                               for(int i=0, idx=0; i < scLength; i++)
+                               {
+                                       colID = 
UtilFunctions.toInt(scattrs.get(i));
+                                       mthd = 
(byte)UtilFunctions.toInt(scmthds.get(i)); 
+                                                       
+                                       if(isApplicable(colID) == -1)
+                                       {       // scaled but not imputed
+                                               _scnomvList[idx] = colID;
+                                               _scnomvMethodList[idx] = 
MVMethod.values()[mthd];
+                                               _scnomvMeanList[idx] = new 
KahanObject(0, 0);
+                                               _scnomvVarList[idx] = new 
CM_COV_Object();
+                                               idx++;
+                                       }
+                               }
+                       }
+               }
+       }
+
+       private void parseMethodsAndReplacments(JSONObject parsedSpec) throws 
JSONException {
+               JSONArray mvspec = (JSONArray) 
parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+               _mvMethodList = new MVMethod[mvspec.size()];
+               _replacementList = new String[mvspec.size()];
+               _meanList = new KahanObject[mvspec.size()];
+               _countList = new long[mvspec.size()];
+               for(int i=0; i < mvspec.size(); i++) {
+                       JSONObject mvobj = (JSONObject)mvspec.get(i);
+                       _mvMethodList[i] = 
MVMethod.valueOf(mvobj.get("method").toString().toUpperCase()); 
+                       if( _mvMethodList[i] == MVMethod.CONSTANT ) {
+                               _replacementList[i] = 
mvobj.getString("value").toString();
+                       }
+                       _meanList[i] = new KahanObject(0, 0);
+               }
+       }
+               
+       public void prepare(String[] words) throws IOException {
+               
+               try {
+                       String w = null;
+                       if(_colList != null)
+                       for(int i=0; i <_colList.length; i++) {
+                               int colID = _colList[i];
+                               w = 
UtilFunctions.unquote(words[colID-1].trim());
+                               
+                               try {
+                               if(!TfUtils.isNA(_NAstrings, w)) {
+                                       _countList[i]++;
+                                       
+                                       boolean computeMean = (_mvMethodList[i] 
== MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) );
+                                       if(computeMean) {
+                                               // global_mean
+                                               double d = 
UtilFunctions.parseToDouble(w);
+                                               _meanFn.execute2(_meanList[i], 
d, _countList[i]);
+                                               
+                                               if (_isMVScaled.get(i) && 
_mvscMethodList[i] == MVMethod.GLOBAL_MODE)
+                                                       
_varFn.execute(_varList[i], d);
+                                       }
+                                       else {
+                                               // global_mode or constant
+                                               // Nothing to do here. Mode is 
computed using recode maps.
+                                       }
+                               }
+                               } catch (NumberFormatException e) 
+                               {
+                                       throw new RuntimeException("Encountered 
\"" + w + "\" in column ID \"" + colID + "\", when expecting a numeric value. 
Consider adding \"" + w + "\" to na.strings, along with an appropriate 
imputation method.");
+                               }
+                       }
+                       
+                       // Compute mean and variance for attributes that are 
scaled but not imputed
+                       if(_scnomvList != null)
+                       for(int i=0; i < _scnomvList.length; i++) 
+                       {
+                               int colID = _scnomvList[i];
+                               w = 
UtilFunctions.unquote(words[colID-1].trim());
+                               double d = UtilFunctions.parseToDouble(w);
+                               _scnomvCountList[i]++;          // not 
required, this is always equal to total #records processed
+                               _meanFn.execute2(_scnomvMeanList[i], d, 
_scnomvCountList[i]);
+                               if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE)
+                                       _varFn.execute(_scnomvVarList[i], d);
+                       }
+               } catch(Exception e) {
+                       throw new IOException(e);
+               }
+       }
+       
+       public MVMethod getMethod(int colID) {
+               int idx = isApplicable(colID);          
+               if(idx == -1)
+                       return MVMethod.INVALID;
+               else
+                       return _mvMethodList[idx];
+       }
+       
+       public long getNonMVCount(int colID) {
+               int idx = isApplicable(colID);
+               return (idx == -1) ? 0 : _countList[idx];
+       }
+       
+       public String getReplacement(int colID)  {
+               int idx = isApplicable(colID);          
+               return (idx == -1) ? null : _replacementList[idx];
+       }
+       
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               build(in);
+               return apply(in, out);
+       }
+       
+       @Override
+       public void build(FrameBlock in) {
+               try {
+                       for( int j=0; j<_colList.length; j++ ) {
+                               int colID = _colList[j];
+                               if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) {
+                                       //compute global column mean (scale)
+                                       long off = _countList[j];
+                                       for( int i=0; i<in.getNumRows(); i++ )
+                                               _meanFn.execute2(_meanList[j], 
UtilFunctions.objectToDouble(
+                                                       
in.getSchema()[colID-1], in.get(i, colID-1)), off+i+1);
+                                       _replacementList[j] = 
String.valueOf(_meanList[j]._sum);
+                                       _countList[j] += in.getNumRows();
+                               }
+                               else if( _mvMethodList[j] == 
MVMethod.GLOBAL_MODE ) {
+                                       //compute global column mode 
(categorical), i.e., most frequent category
+                                       HashMap<String,Long> hist = 
_hist.containsKey(colID) ? 
+                                                       _hist.get(colID) : new 
HashMap<String,Long>();
+                                       for( int i=0; i<in.getNumRows(); i++ ) {
+                                               String key = 
String.valueOf(in.get(i, colID-1));
+                                               if( key != null && 
!key.isEmpty() ) {
+                                                       Long val = 
hist.get(key);
+                                                       hist.put(key, 
(val!=null) ? val+1 : 1);
+                                               }       
+                                       }
+                                       _hist.put(colID, hist);
+                                       long max = Long.MIN_VALUE; 
+                                       for( Entry<String, Long> e : 
hist.entrySet() ) 
+                                               if( e.getValue() > max  ) {
+                                                       _replacementList[j] = 
e.getKey();
+                                                       max = e.getValue();
+                                               }
+                               }
+                       }
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+       }
+
+       @Override
+       public String[] apply(String[] words) 
+       {       
+               if( isApplicable() )
+                       for(int i=0; i < _colList.length; i++) {
+                               int colID = _colList[i];
+                               String w = 
UtilFunctions.unquote(words[colID-1]);
+                               if(TfUtils.isNA(_NAstrings, w))
+                                       w = words[colID-1] = 
_replacementList[i];
+                               
+                               if ( _isMVScaled.get(i) )
+                                       if ( _mvscMethodList[i] == 
MVMethod.GLOBAL_MEAN )
+                                               words[colID-1] = 
Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
+                                       else
+                                               words[colID-1] = 
Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / 
_varList[i].mean._sum );
+                       }
+               
+               if(_scnomvList != null)
+               for(int i=0; i < _scnomvList.length; i++)
+               {
+                       int colID = _scnomvList[i];
+                       if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN )
+                               words[colID-1] = Double.toString( 
UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum );
+                       else
+                               words[colID-1] = Double.toString( 
(UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / 
_scnomvVarList[i].mean._sum );
+               }
+                       
+               return words;
+       }
+       
+       @Override
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+               for(int i=0; i<in.getNumRows(); i++) {
+                       for(int j=0; j<_colList.length; j++) {
+                               int colID = _colList[j];
+                               if( Double.isNaN(out.quickGetValue(i, colID-1)) 
)
+                                       out.quickSetValue(i, colID-1, 
Double.parseDouble(_replacementList[j]));
+                       }
+               }
+               return out;
+       }
+       
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               for( int j=0; j<_colList.length; j++ ) {
+                       out.getColumnMetadata(_colList[j]-1)
+                          .setMvValue(_replacementList[j]);
+               }
+               return out;
+       }
+
+       public void initMetaData(FrameBlock meta) {
+               //init replacement lists, replace recoded values to
+               //apply mv imputation potentially after recoding
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j];        
+                       String mvVal = 
UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); 
+                       if( _rcList.contains(colID) ) {
+                               Long mvVal2 = 
meta.getRecodeMap(colID-1).get(mvVal);
+                               if( mvVal2 == null)
+                                       throw new RuntimeException("Missing 
recode value for impute value '"+mvVal+"' (colID="+colID+").");
+                               _replacementList[j] = mvVal2.toString();
+                       }
+                       else {
+                               _replacementList[j] = mvVal;
+                       }
+               }
+       }
+
+       public void initRecodeIDList(List<Integer> rcList) {
+               _rcList = rcList;
+       }
+       
+       /**
+        * Exposes the internal histogram after build.
+        * 
+        * @param colID column ID
+        * @return histogram (map of string keys and long values)
+        */
+       public HashMap<String,Long> getHistogram( int colID ) {
+               return _hist.get(colID);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
new file mode 100644
index 0000000..af09cee
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderOmit.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class EncoderOmit extends Encoder 
+{      
+       private static final long serialVersionUID = 1978852120416654195L;
+
+       private int _rmRows = 0;
+
+       public EncoderOmit(JSONObject parsedSpec, String[] colnames, int clen) 
+               throws JSONException 
+       {
+               super(null, clen);
+               if (!parsedSpec.containsKey(TfUtils.TXMETHOD_OMIT))
+                       return;
+               int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
colnames, TfUtils.TXMETHOD_OMIT);
+               initColList(collist);
+       }
+       
+       public int getNumRemovedRows() {
+               return _rmRows;
+       }
+       
+       public boolean omit(String[] words, TfUtils agents) 
+       {
+               if( !isApplicable() )
+                       return false;
+               
+               for(int i=0; i<_colList.length; i++) {
+                       int colID = _colList[i];
+                       
if(TfUtils.isNA(agents.getNAStrings(),UtilFunctions.unquote(words[colID-1].trim())))
+                               return true;
+               }
+               return false;
+       }
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               return apply(in, out);
+       }
+       
+       @Override
+       public void build(FrameBlock in) {      
+               //do nothing
+       }
+       
+       @Override
+       public String[] apply(String[] words) {
+               return null;
+       }
+       
+       @Override
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
+       {
+               //determine output size
+               int numRows = 0;
+               for(int i=0; i<out.getNumRows(); i++) {
+                       boolean valid = true;
+                       for(int j=0; j<_colList.length; j++)
+                               valid &= !Double.isNaN(out.quickGetValue(i, 
_colList[j]-1));
+                       numRows += valid ? 1 : 0;
+               }
+               
+               //copy over valid rows into the output
+               MatrixBlock ret = new MatrixBlock(numRows, out.getNumColumns(), 
false);
+               int pos = 0;
+               for(int i=0; i<in.getNumRows(); i++) {
+                       //determine if valid row or omit
+                       boolean valid = true;
+                       for(int j=0; j<_colList.length; j++)
+                               valid &= !Double.isNaN(out.quickGetValue(i, 
_colList[j]-1));
+                       //copy row if necessary
+                       if( valid ) {
+                               for(int j=0; j<out.getNumColumns(); j++)
+                                       ret.quickSetValue(pos, j, 
out.quickGetValue(i, j));
+                               pos++;
+                       }
+               }
+       
+               //keep info an remove rows
+               _rmRows = out.getNumRows() - pos;
+               
+               return ret; 
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               //do nothing
+               return out;
+       }
+       
+       @Override
+       public void initMetaData(FrameBlock meta) {
+               //do nothing
+       }
+}
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
index 08722fd..d84ea0d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
@@ -19,19 +19,10 @@
 
 package org.apache.sysml.runtime.transform.encode;
 
-import java.io.IOException;
-import java.util.Iterator;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.DistinctValue;
-import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -89,20 +80,4 @@ public class EncoderPassThrough extends Encoder
        public void initMetaData(FrameBlock meta) {
                //do nothing
        }
-       
-       
-       @Override
-       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               throw new RuntimeException("File-based api not supported.");
-       }
-
-       @Override
-       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-               throw new RuntimeException("File-based api not supported.");    
-       }
-
-       @Override
-       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               throw new RuntimeException("File-based api not supported.");
-       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
new file mode 100644
index 0000000..bb8592c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderRecode extends Encoder 
+{      
+       private static final long serialVersionUID = 8213163881283341874L;
+
+       private int[] _mvrcdList = null;
+       private int[] _fullrcdList = null;
+       
+       //recode maps and custom map for partial recode maps 
+       private HashMap<Integer, HashMap<String, Long>> _rcdMaps  = new 
HashMap<Integer, HashMap<String, Long>>();
+       private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+       private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null;
+       
+       public EncoderRecode(JSONObject parsedSpec, String[] colnames, int clen)
+               throws JSONException 
+       {
+               super(null, clen);
+               int rcdCount = 0;
+               
+               if( parsedSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) {
+                       int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
colnames, TfUtils.TXMETHOD_RECODE);
+                       rcdCount = initColList(collist);
+               }
+               
+               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_MVRCD)) {
+                       _mvrcdList = TfMetaUtils.parseJsonIDList(parsedSpec, 
colnames, TfUtils.TXMETHOD_MVRCD);
+                       rcdCount += _mvrcdList.length;
+               }
+               
+               if ( rcdCount > 0 ) {
+                       _fullrcdList = new int[rcdCount];
+                       int idx = -1;
+                       if(_colList != null)
+                               for(int i=0; i < _colList.length; i++)
+                                       _fullrcdList[++idx] = _colList[i]; 
+                       
+                       if(_mvrcdList != null)
+                               for(int i=0; i < _mvrcdList.length; i++)
+                                       _fullrcdList[++idx] = _mvrcdList[i]; 
+               }
+       }
+       
+       public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { 
+               return _rcdMaps; 
+       }
+       
+       public HashMap<Integer, HashSet<Object>> getCPRecodeMapsPartial() { 
+               return _rcdMapsPart; 
+       }
+       
+       public HashMap<Integer, HashMap<String,String>> getRecodeMaps() {
+               return _finalMaps;
+       }
+       
+       private String lookupRCDMap(int colID, String key) {
+               if( _finalMaps!=null )
+                       return _finalMaps.get(colID).get(key);
+               else { //used for cp
+                       Long tmp = _rcdMaps.get(colID).get(key);
+                       return (tmp!=null) ? Long.toString(tmp) : null;
+               }
+       }
+       
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               if( !isApplicable() )
+                       return out;
+               
+               //build and apply recode maps 
+               build(in);
+               apply(in, out);
+               
+               return out;
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               if( !isApplicable() )
+                       return;         
+
+               Iterator<String[]> iter = in.getStringRowIterator();
+               while( iter.hasNext() ) {
+                       String[] row = iter.next(); 
+                       for( int j=0; j<_colList.length; j++ ) {
+                               int colID = _colList[j]; //1-based
+                               //allocate column map if necessary
+                               if( !_rcdMaps.containsKey(colID) ) 
+                                       _rcdMaps.put(colID, new 
HashMap<String,Long>());
+                               //probe and build column map
+                               HashMap<String,Long> map = _rcdMaps.get(colID);
+                               String key = row[colID-1];
+                               if( key!=null && !key.isEmpty() && 
!map.containsKey(key) )
+                                       map.put(key, 
Long.valueOf(map.size()+1));
+                       }
+               }
+       }
+
+       public void buildPartial(FrameBlock in) {
+               if( !isApplicable() )
+                       return;         
+
+               //ensure allocated partial recode map
+               if( _rcdMapsPart == null )
+                       _rcdMapsPart = new HashMap<Integer, HashSet<Object>>();
+               
+               //construct partial recode map (tokens w/o codes)
+               //iterate over columns for sequential access
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       //allocate column map if necessary
+                       if( !_rcdMapsPart.containsKey(colID) ) 
+                               _rcdMapsPart.put(colID, new HashSet<Object>());
+                       HashSet<Object> map = _rcdMapsPart.get(colID);
+                       //probe and build column map
+                       for( int i=0; i<in.getNumRows(); i++ )
+                               map.add(in.get(i, colID-1));
+                       //cleanup unnecessary entries once
+                       map.remove(null);
+                       map.remove("");
+               }
+       }
+       
+       /**
+        * Method to apply transformations.
+        */
+       @Override
+       public String[] apply(String[] words) 
+       {
+               if( !isApplicable() )
+                       return words;
+               
+               //apply recode maps on relevant columns of given row
+               for(int i=0; i < _colList.length; i++) {
+                       //prepare input and get code
+                       int colID = _colList[i];
+                       String key = 
UtilFunctions.unquote(words[colID-1].trim());
+                       String val = lookupRCDMap(colID, key);                  
+                       // replace unseen keys with NaN 
+                       words[colID-1] = (val!=null) ? val : "NaN";
+               }
+                       
+               return words;
+       }
+       
+       @Override
+       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+               //apply recode maps column wise
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j];
+                       for( int i=0; i<in.getNumRows(); i++ ) {
+                               Object okey = in.get(i, colID-1);
+                               String key = (okey!=null) ? okey.toString() : 
null;
+                               String val = lookupRCDMap(colID, key);          
        
+                               out.quickSetValue(i, colID-1, (val!=null) ? 
+                                               Double.parseDouble(val) : 
Double.NaN);
+                       }
+               }
+               
+               return out;
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock meta) {
+               if( !isApplicable() )
+                       return meta;
+               
+               //inverse operation to initRecodeMaps
+               
+               //allocate output rows
+               int maxDistinct = 0;
+               for( int j=0; j<_colList.length; j++ )
+                       if( _rcdMaps.containsKey(_colList[j]) )
+                               maxDistinct = Math.max(maxDistinct, 
_rcdMaps.get(_colList[j]).size());
+               meta.ensureAllocatedColumns(maxDistinct);
+               
+               //create compact meta data representation
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       int rowID = 0;
+                       if( _rcdMaps.containsKey(_colList[j]) )
+                               for( Entry<String, Long> e : 
_rcdMaps.get(colID).entrySet() ) {
+                                       String tmp = 
constructRecodeMapEntry(e.getKey(), e.getValue());
+                                       meta.set(rowID++, colID-1, tmp); 
+                               }
+                       meta.getColumnMetadata(colID-1).setNumDistinct(
+                                       _rcdMaps.get(colID).size());
+               }
+               
+               return meta;
+       }
+       
+
+       /**
+        * Construct the recodemaps from the given input frame for all 
+        * columns registered for recode.
+        * 
+        * @param meta frame block
+        */
+       public void initMetaData( FrameBlock meta ) {
+               if( meta == null || meta.getNumRows()<=0 )
+                       return;
+               
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       _rcdMaps.put(colID, meta.getRecodeMap(colID-1));
+               }
+       }
+       
+       /**
+        * Returns the Recode map entry which consists of concatenation of 
code, delimiter and token. 
+        * @param token is part of Recode map
+        * @param code  is code for token 
+        * @return the concatenation of code and token with delimiter in between
+        */
+       public static String constructRecodeMapEntry(String token, Long code) {
+               return token + Lop.DATATYPE_PREFIX + code.toString();
+       }
+}
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
index 62b90b4..afb7ee9 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
@@ -50,7 +50,6 @@ import org.apache.wink.json4j.JSONObject;
 
 public class TfMetaUtils 
 {
-
        public static boolean isIDSpecification(String spec) throws 
DMLRuntimeException {
                try {
                        JSONObject jSpec = new JSONObject(spec);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
index af2e75f..b506444 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -88,9 +88,7 @@ public class FrameFunctionTest extends AutomatedTestBase
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
        
                boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
-               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
                OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
-               OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
                
                try
                {
@@ -126,7 +124,6 @@ public class FrameFunctionTest extends AutomatedTestBase
                        rtplatform = platformOld;
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
                        OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
index ecc958b..c629eee 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.io.FrameWriter;
@@ -201,10 +200,6 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                if( rtplatform == RUNTIME_PLATFORM.SPARK )
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
                
-               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
-               if( ofmt.equals("csv") )
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
-               
                try
                {
                        int cols = multColBlks ? cols2 : cols1;
@@ -235,7 +230,6 @@ public class FrameMatrixReblockTest extends 
AutomatedTestBase
                finally {
                        rtplatform = platformOld;
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
                }
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
index 5066582..ceeec07 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
@@ -21,7 +21,6 @@ package org.apache.sysml.test.integration.functions.frame;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
@@ -101,10 +100,6 @@ public class FrameMetaReadWriteTest extends 
AutomatedTestBase
                        DMLScript.USE_LOCAL_SPARK_CONFIG = true;
        
                String ofmt = OutputInfo.outputInfoToStringExternal(oinfo);
-
-               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
-               if( ofmt.equals("csv") )
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
                
                try
                {
@@ -148,7 +143,6 @@ public class FrameMetaReadWriteTest extends 
AutomatedTestBase
                finally {
                        rtplatform = platformOld;
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
index 35078f3..056e619 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/FrameCSVReadWriteTest.java
@@ -22,7 +22,6 @@ package org.apache.sysml.test.integration.functions.transform;
 import org.junit.Test;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -75,7 +74,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
        {
                //set runtime platform
                RUNTIME_PLATFORM rtold = rtplatform;
-               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
                rtplatform = rt;
 
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
@@ -94,7 +92,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
                        programArgs = new String[]{"-explain","-args", 
                                HOME + "input/" + DATASET, output("R") };
        
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
                        runTest(true, false, null, -1); 
                        
                        //read input/output and compare
@@ -113,7 +110,6 @@ public class FrameCSVReadWriteTest extends AutomatedTestBase
                finally {
                        rtplatform = rtold;
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
                }
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
deleted file mode 100644
index 81c0bab..0000000
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/RunTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.transform;
-
-import java.io.IOException;
-
-import org.junit.Test;
-
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-/**
- * 
- * 
- */
-public class RunTest extends AutomatedTestBase 
-{
-       
-       private final static String TEST_NAME1 = "Transform";
-       private final static String TEST_NAME2 = "Apply";
-       private final static String TEST_DIR = "functions/transform/";
-       private final static String TEST_CLASS_DIR = TEST_DIR + 
RunTest.class.getSimpleName() + "/";
-       
-       private final static String HOMES_DATASET       = "homes/homes.csv";
-       //private final static String HOMES_SPEC                = 
"homes/homes.tfspec.json";
-       private final static String HOMES_SPEC2         = 
"homes/homes.tfspec2.json";
-       //private final static String HOMES_IDSPEC      = 
"homes/homes.tfidspec.json";
-       //private final static String HOMES_TFDATA      = 
"homes/homes.transformed.csv";
-       //private final static String HOMES_COLNAMES    = 
"homes/homes.csv.colnames";
-       
-       private final static String HOMES_NAN_DATASET   = "homes/homesNAN.csv";
-       private final static String HOMES_NAN_SPEC              = 
"homes/homesNAN.tfspec.json";
-       //private final static String HOMES_NAN_IDSPEC  = 
"homes/homesNAN.tfidspec.json";
-       private final static String HOMES_NAN_COLNAMES  = 
"homes/homesNAN.colnames.csv";
-       
-       private final static String HOMES_MISSING_DATASET       = 
"homes/homesAllMissing.csv";
-       private final static String HOMES_MISSING_SPEC          = 
"homes/homesAllMissing.tfspec.json";
-       private final static String HOMES_MISSING_IDSPEC        = 
"homes/homesAllMissing.tfidspec.json";
-       
-       @Override
-       public void setUp() 
-       {
-               TestUtils.clearAssertionInformation();
-               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"R"}));
-       }
-       
-       // ---- NAN BinaryBlock ----
-       
-       @Test
-       public void runTestWithNAN_HybridBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "binary");
-       }
-
-       @Test
-       public void runTestWithNAN_SPHybridBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
-       }
-
-       @Test
-       public void runTestWithNAN_HadoopBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "binary");
-       }
-
-       @Test
-       public void runTestWithNAN_SparkBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "binary");
-       }
-
-       // ---- NAN CSV ----
-       
-       @Test
-       public void runTestWithNAN_HybridCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID, "csv");
-       }
-
-       @Test
-       public void runTestWithNAN_SPHybridCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
-       }
-
-       @Test
-       public void runTestWithNAN_HadoopCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.HADOOP, "csv");
-       }
-
-       @Test
-       public void runTestWithNAN_SparkCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_NAN_DATASET, HOMES_NAN_SPEC, 
HOMES_NAN_COLNAMES, false, RUNTIME_PLATFORM.SPARK, "csv");
-       }
-
-       // ---- Test2 BinaryBlock ----
-       
-       @Test
-       public void runTest2_HybridBB() throws DMLRuntimeException, IOException 
{
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.HYBRID, "binary");
-       }
-
-       @Test
-       public void runTest2_SPHybridBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
-       }
-
-       @Test
-       public void runTest2_HadoopBB() throws DMLRuntimeException, IOException 
{
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.HADOOP, "binary");
-       }
-
-       @Test
-       public void runTest2_SparkBB() throws DMLRuntimeException, IOException {
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.SPARK, "binary");
-       }
-
-       // ---- Test2 CSV ----
-       
-       @Test
-       public void runTest2_HybridCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.HYBRID, "csv");
-       }
-
-       @Test
-       public void runTest2_SPHybridCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
-       }
-
-       @Test
-       public void runTest2_HadoopCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.HADOOP, "csv");
-       }
-
-       @Test
-       public void runTest2_SparkCSV() throws DMLRuntimeException, IOException 
{
-               runScalingTest( HOMES_DATASET, HOMES_SPEC2, null, false, 
RUNTIME_PLATFORM.SPARK, "csv");
-       }
-
-       // ---- HomesMissing BinaryBlock ----
-       
-       @Test
-       public void runAllMissing_HybridBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, 
null, true, RUNTIME_PLATFORM.HYBRID, "binary");
-       }
-
-       @Test
-       public void runAllMissing_SPHybridBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, 
null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "binary");
-       }
-
-       @Test
-       public void runAllMissing_HadoopBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, 
null, true, RUNTIME_PLATFORM.HADOOP, "binary");
-       }
-
-       @Test
-       public void runAllMissing_SparkBB() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_SPEC, 
null, true, RUNTIME_PLATFORM.SPARK, "binary");
-       }
-
-       // ---- HomesMissing CSV ----
-       
-       @Test
-       public void runAllMissing_HybridCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, 
null, true, RUNTIME_PLATFORM.HYBRID, "csv");
-       }
-
-       @Test
-       public void runAllMissing_SPHybridCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, 
null, true, RUNTIME_PLATFORM.HYBRID_SPARK, "csv");
-       }
-
-       @Test
-       public void runAllMissing_HadoopCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, 
null, true, RUNTIME_PLATFORM.HADOOP, "csv");
-       }
-
-       @Test
-       public void runAllMissing_SparkCSV() throws DMLRuntimeException, 
IOException {
-               runScalingTest( HOMES_MISSING_DATASET, HOMES_MISSING_IDSPEC, 
null, true, RUNTIME_PLATFORM.SPARK, "csv");
-       }
-
-       // ------------------
-       
-       /**
-        * 
-        * @param sparseM1
-        * @param sparseM2
-        * @param instType
-        * @throws IOException 
-        * @throws DMLRuntimeException 
-        */
-       private void runScalingTest( String dataset, String spec, String 
colnames, boolean exception, RUNTIME_PLATFORM rt, String ofmt) throws 
IOException, DMLRuntimeException
-       {
-               RUNTIME_PLATFORM platformOld = rtplatform;
-               rtplatform = rt;
-       
-               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
-               if( rtplatform == RUNTIME_PLATFORM.SPARK  || rtplatform == 
RUNTIME_PLATFORM.HYBRID_SPARK)
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-
-               try
-               {
-                       getAndLoadTestConfiguration(TEST_NAME1);
-                       
-                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
-                       String HOME = SCRIPT_DIR + TEST_DIR;
-                       fullDMLScriptName = null;
-                       
-                       if (colnames == null) {
-                               fullDMLScriptName  = HOME + TEST_NAME1 + ".dml";
-                               programArgs = new String[]{"-nvargs", 
-                                                                               
        "DATA=" + HOME + "input/" + dataset,
-                                                                               
        "TFSPEC=" + HOME + "input/" + spec,
-                                                                               
        "TFMTD=" + output("tfmtd"),
-                                                                               
        "TFDATA=" + output("tfout"),
-                                                                               
        "OFMT=" + ofmt };
-                       }
-                       else {
-                               fullDMLScriptName  = HOME + TEST_NAME1 + 
"_colnames.dml";
-                               programArgs = new String[]{"-nvargs", 
-                                                                               
        "DATA=" + HOME + "input/" + dataset,
-                                                                               
        "TFSPEC=" + HOME + "input/" + spec,
-                                                                               
        "COLNAMES=" + HOME + "input/" + colnames,
-                                                                               
        "TFMTD=" + output("tfmtd"),
-                                                                               
        "TFDATA=" + output("tfout"),
-                                                                               
        "OFMT=" + ofmt };
-                       }
-       
-                       boolean exceptionExpected = exception;
-                       runTest(true, exceptionExpected, null, -1); 
-                       
-                       fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
-                       programArgs = new String[]{"-nvargs", 
-                                                                               
        "DATA=" + HOME + "input/" + dataset,
-                                                                               
        "APPLYMTD=" + output("tfmtd"),  // generated above
-                                                                               
        "TFMTD=" + output("test_tfmtd"),
-                                                                               
        "TFDATA=" + output("test_tfout"),
-                                                                               
        "OFMT=" + ofmt };
-       
-                       exceptionExpected = exception;
-                       runTest(true, exceptionExpected, null, -1); 
-                       
-               }
-               finally
-               {
-                       rtplatform = platformOld;
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
-               }
-       }       
-}
\ No newline at end of file

Reply via email to