http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 27058de..a4cfaa6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -1,549 +1,549 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.parser.DataExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.io.MatrixReader;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.sysml.utils.JSONHelper;
-
-
-@SuppressWarnings("deprecation")
-public class TfUtils implements Serializable{
-       
-       private static final long serialVersionUID = 526252850872633125L;
-
-       private OmitAgent _oa = null;
-       private MVImputeAgent _mia = null;
-       private RecodeAgent _ra = null; 
-       private BinAgent _ba = null;
-       private DummycodeAgent _da = null;
-       
-       private long _numRecordsInPartFile;             // Total number of 
records in the data file
-       private long _numValidRecords;                  // 
(_numRecordsInPartFile - #of omitted records)
-       private long _numTransformedRows;               // Number of rows after 
applying transformations
-       private long _numTransformedColumns;    // Number of columns after 
applying transformations
-
-       private String _headerLine = null;
-       private boolean _hasHeader;
-       private Pattern _delim = null;
-       private String _delimString = null;
-       private String[] _NAstrings = null;
-       private String[] _outputColumnNames = null;
-       private long _numInputCols = -1;
-       
-       private String _tfMtdDir = null;
-       private String _specFile = null;
-       private String _offsetFile = null;
-       private String _tmpDir = null;
-       private String _outputPath = null;
-       
-       protected static boolean checkValidInputFile(FileSystem fs, Path path, 
boolean err)
-                       throws IOException {
-               // check non-existing file
-               if (!fs.exists(path))
-                       if ( err )
-                               throw new IOException("File " + path.toString() 
+ " does not exist on HDFS/LFS.");
-                       else
-                               return false;
-
-               // check for empty file
-               if (MapReduceTool.isFileEmpty(fs, path.toString()))
-                       if ( err )
-                       throw new EOFException("Empty input file " + 
path.toString() + ".");
-                       else
-                               return false;
-               
-               return true;
-       }
-       
-       public static String getPartFileName(JobConf job) throws IOException {
-               FileSystem fs = FileSystem.get(job);
-               Path thisPath=new 
Path(job.get("map.input.file")).makeQualified(fs);
-               return thisPath.toString();
-       }
-       
-       public static boolean isPartFileWithHeader(JobConf job) throws 
IOException {
-               FileSystem fs = FileSystem.get(job);
-               
-               String thisfile=getPartFileName(job);
-               Path smallestFilePath=new 
Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-               
-               if(thisfile.toString().equals(smallestFilePath.toString()))
-                       return true;
-               else
-                       return false;
-       }
-       
-       public static JSONObject readSpec(FileSystem fs, String specFile) 
throws IOException {
-               BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(new Path(specFile))));
-               JSONObject obj = JSONHelper.parse(br);
-               br.close();
-               return obj;
-       }
-       
-       /**
-        * Prepare NA strings so that they can be sent to workers via JobConf.
-        * A "dummy" string is added at the end to handle the case of empty 
strings.
-        * @param na
-        * @return
-        */
-       public static String prepNAStrings(String na) {
-               return na  + DataExpression.DELIM_NA_STRING_SEP + "dummy";
-       }
-       
-       public static String[] parseNAStrings(String na) 
-       {
-               if ( na == null )
-                       return null;
-               
-               String[] tmp = 
Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, 
-1);
-               return tmp; //Arrays.copyOf(tmp, tmp.length-1);
-       }
-       
-       public static String[] parseNAStrings(JobConf job) 
-       {
-               return 
parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
-       }
-       
-       private void createAgents(JSONObject spec) throws IOException, 
JSONException {
-               _oa = new OmitAgent(spec);
-               _mia = new MVImputeAgent(spec);
-               _ra = new RecodeAgent(spec);
-               _ba = new BinAgent(spec);
-               _da = new DummycodeAgent(spec, _numInputCols);
-       }
-       
-       public void setupAgents(OmitAgent oa, MVImputeAgent mia, RecodeAgent 
ra, BinAgent ba, DummycodeAgent da)  {
-               _oa = oa;
-               _mia = mia;
-               _ra = ra;
-               _ba = ba;
-               _da = da;
-       }
-       
-       private void parseColumnNames() {
-               _outputColumnNames = _delim.split(_headerLine, -1);
-               for(int i=0; i < _outputColumnNames.length; i++)
-                       _outputColumnNames[i] = 
UtilFunctions.unquote(_outputColumnNames[i]);
-       }
-       
-       private void init(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long numCols, String offsetFile, String 
tmpPath, String outputPath) throws IOException, JSONException
-       {
-               _numRecordsInPartFile = 0;
-               _numValidRecords = 0;
-               _numTransformedRows = 0;
-               _numTransformedColumns = 0;
-               
-               _headerLine = headerLine;
-               _hasHeader = hasHeader;
-               _delimString = delim;
-               _delim = Pattern.compile(Pattern.quote(delim));
-               _NAstrings = naStrings;
-               _numInputCols = numCols;
-               _offsetFile = offsetFile;
-               _tmpDir = tmpPath;
-               _outputPath = outputPath;
-               
-               parseColumnNames();             
-               createAgents(spec);
-       }
-       
-       public TfUtils(JobConf job, boolean minimal) 
-               throws IOException, JSONException 
-       {
-               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                       ConfigurationManager.setCachedJobConf(job);
-               }
-               
-               _NAstrings = TfUtils.parseNAStrings(job);
-               _specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-               
-               FileSystem fs = FileSystem.get(job);
-               JSONObject spec = TfUtils.readSpec(fs, _specFile);
-               
-               _oa = new OmitAgent(spec);
-       }
-       
-       // called from GenTFMtdMapper, ApplyTf (Hadoop)
-       public TfUtils(JobConf job) 
-               throws IOException, JSONException 
-       {
-               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                       ConfigurationManager.setCachedJobConf(job);
-               }
-               
-               boolean hasHeader = 
Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
-               //Pattern delim = 
Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
-               String[] naStrings = TfUtils.parseNAStrings(job);
-               
-               long numCols = UtilFunctions.parseToLong( 
job.get(MRJobConfiguration.TF_NUM_COLS) );            // #of columns in input 
data
-                       
-               String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-               String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
-               String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
-               String outputPath = 
FileOutputFormat.getOutputPath(job).toString();
-               FileSystem fs = FileSystem.get(job);
-               JSONObject spec = TfUtils.readSpec(fs, specFile);
-               
-               init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, 
job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, 
tmpPath, outputPath);
-       }
-       
-       // called from GenTfMtdReducer 
-       public TfUtils(JobConf job, String tfMtdDir) throws IOException, 
JSONException 
-       {
-               this(job);
-               _tfMtdDir = tfMtdDir;
-       }
-       
-       // called from GenTFMtdReducer and ApplyTf (Spark)
-       public TfUtils(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String 
offsetFile, String tmpPath) throws IOException, JSONException {
-               init (headerLine, hasHeader, delim, naStrings, spec, ncol, 
offsetFile, tmpPath, null);
-               _tfMtdDir = tfMtdDir;
-       }
-       
-       public void incrValid() { _numValidRecords++; }
-       public long getValid()  { return _numValidRecords; }
-       public long getTotal()  { return _numRecordsInPartFile; }
-       public long getNumTransformedRows()     { return _numTransformedRows; }
-       public long getNumTransformedColumns()  { return 
_numTransformedColumns; }
-       
-       public String getHeader()               { return _headerLine; }
-       public boolean hasHeader()              { return _hasHeader; }
-       public String getDelimString()  { return _delimString; }
-       public Pattern getDelim()               { return _delim; }
-       public String[] getNAStrings()  { return _NAstrings; }
-       public long getNumCols()                { return _numInputCols; }
-       
-       public String getSpecFile()     { return _specFile; }
-       public String getTfMtdDir()     { return _tfMtdDir; }
-       public String getOffsetFile()   { return _offsetFile; }
-       public String getTmpDir()               { return _tmpDir; }
-       public String getOutputPath()   { return _outputPath; }
-       
-       public String getName(int colID) { return _outputColumnNames[colID-1]; }
-       
-       public void setValid(long n) { _numValidRecords = n;}
-       public void incrTotal() { _numRecordsInPartFile++; }
-       public void setTotal(long n) { _numRecordsInPartFile = n;}
-       
-       public OmitAgent          getOmitAgent()        {       return _oa; }
-       public MVImputeAgent  getMVImputeAgent(){       return _mia;}
-       public RecodeAgent        getRecodeAgent()      {       return _ra; }
-       public BinAgent           getBinAgent()         {       return _ba; }
-       public DummycodeAgent getDummycodeAgent() { return _da; }
-       
-       /**
-        * Function that checks if the given string is one of NA strings.
-        * 
-        * @param w
-        * @return
-        */
-       public boolean isNA(String w) {
-               if(_NAstrings == null)
-                       return false;
-               
-               for(String na : _NAstrings) {
-                       if(w.equals(na))
-                               return true;
-               }
-               return false;
-       }
-       
-       public String[] getWords(Text line)
-       {
-               return getWords(line.toString());
-       }
-       
-
-       public String[] getWords(String line) 
-       {
-               return getDelim().split(line.trim(), -1);
-       }
-       
-       /**
-        * Process a given row to construct transformation metadata.
-        * 
-        * @param line
-        * @return
-        * @throws IOException
-        */
-       public String[] prepareTfMtd(String line) throws IOException {
-               String[] words = getWords(line);
-               if(!getOmitAgent().omit(words, this))
-               {
-                       getMVImputeAgent().prepare(words, this);
-                       getRecodeAgent().prepare(words, this);
-                       getBinAgent().prepare(words, this);
-                       incrValid();;
-               }
-               incrTotal();
-               
-               return words;
-       }
-       
-       public void loadTfMetadata() throws IOException 
-       {
-               JobConf job = ConfigurationManager.getCachedJobConf();
-               loadTfMetadata(job, false);
-       }
-       
-       public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws 
IOException
-       {
-               Path tfMtdDir = null; 
-               FileSystem fs = null;
-               
-               if(fromLocalFS) {
-                       // metadata must be read from local file system (e.g., 
distributed cache in the case of Hadoop)
-                       tfMtdDir = 
(DistributedCache.getLocalCacheFiles(job))[0];
-                       fs = FileSystem.getLocal(job);
-               }
-               else {
-                       fs = FileSystem.get(job);
-                       tfMtdDir = new Path(getTfMtdDir());
-               }
-               
-               // load transformation metadata 
-               getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-               getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-               getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
-               
-               // associate recode maps and bin definitions with dummycoding 
agent,
-               // as recoded and binned columns are typically dummycoded
-               getDummycodeAgent().setRecodeMaps( 
getRecodeAgent().getRecodeMaps() );
-               getDummycodeAgent().setNumBins(getBinAgent().getBinList(), 
getBinAgent().getNumBins());
-               getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
-
-       }
-       
-       /*public void loadTfMetadata () throws IOException
-       {
-               Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
-               FileSystem localFS = FileSystem.getLocal(_rJob);
-               
-               loadTfMetadata(_rJob, localFS, tfMtdDir);
-               
-               FileSystem fs;
-               fs = FileSystem.get(_rJob);
-               Path thisPath=new 
Path(_rJob.get("map.input.file")).makeQualified(fs);
-               String thisfile=thisPath.toString();
-                       
-               Path smallestFilePath=new 
Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-               if(thisfile.toString().equals(smallestFilePath.toString()))
-                       _partFileWithHeader=true;
-               else
-                       _partFileWithHeader = false;
-       }*/
-
-
-       public String processHeaderLine() throws IOException 
-       {
-               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-               String dcdHeader = 
getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
-               getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), 
(int) getNumCols(), this);
-               
-               // write header information (before and after transformation) 
to temporary path
-               // these files are copied into txMtdPath, once the ApplyTf job 
is complete.
-               DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), 
dcdHeader);
-
-               return dcdHeader;
-               //_numTransformedColumns = getDelim().split(dcdHeader, 
-1).length; 
-               //return _numTransformedColumns;
-       }
-
-       public boolean omit(String[] words) {
-               if(getOmitAgent() == null)
-                       return false;
-               return getOmitAgent().omit(words, this);
-       }
-       
-       
-       public String[] apply(String[] words) {
-               return apply(words, false);
-       }
-       
-       /**
-        * Function to apply transformation metadata on a given row.
-        * 
-        * @param words
-        * @param optimizeMaps
-        * @return
-        */
-       public String[] apply ( String[] words, boolean optimizeMaps ) 
-       {
-               words = getMVImputeAgent().apply(words, this);
-               
-               if(optimizeMaps)
-                       // specific case of transform() invoked from CP (to 
save boxing and unboxing)
-                       words = getRecodeAgent().cp_apply(words, this);
-               else
-                       words = getRecodeAgent().apply(words, this);
-
-               words = getBinAgent().apply(words, this);
-               words = getDummycodeAgent().apply(words, this);
-               
-               _numTransformedRows++;
-               
-               return words;
-       }
-       
-       public void check(String []words) throws DMLRuntimeException 
-       {
-               boolean checkEmptyString = ( getNAStrings() != null );
-               if ( checkEmptyString ) 
-               {
-                       final String msg = "When na.strings are provided, empty 
string \"\" is considered as a missing value, and it must be imputed 
appropriately. Encountered an unhandled empty string in column ID: ";
-                       for(int i=0; i<words.length; i++) 
-                               if ( words[i] != null && words[i].equals(""))
-                                       throw new DMLRuntimeException(msg + 
getDummycodeAgent().mapDcdColumnID(i+1));
-               }
-       }
-       
-       public String checkAndPrepOutputString(String []words) throws 
DMLRuntimeException 
-       {
-               return checkAndPrepOutputString(words, new StringBuilder());
-       }
-       
-       public String checkAndPrepOutputString(String []words, StringBuilder 
sb) throws DMLRuntimeException 
-       {
-               /*
-                * Check if empty strings ("") have to be handled.
-                * 
-                * Unless na.strings are provided, empty strings are 
(implicitly) considered as value zero.
-                * When na.strings are provided, then "" is considered a 
missing value indicator, and the 
-                * user is expected to provide an appropriate imputation 
method. Therefore, when na.strings 
-                * are provided, "" encountered in any column (after all 
transformations are applied) 
-                * denotes an erroneous condition.  
-                */
-               boolean checkEmptyString = ( getNAStrings() != null ); //&& 
!MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
-               
-               //StringBuilder sb = new StringBuilder();
-               sb.setLength(0);
-               int i =0;
-               
-               if ( checkEmptyString ) 
-               {
-                       final String msg = "When na.strings are provided, empty 
string \"\" is considered as a missing value, and it must be imputed 
appropriately. Encountered an unhandled empty string in column ID: ";
-                       if ( words[0] != null ) 
-                               if ( words[0].equals("") )
-                                       throw new DMLRuntimeException( msg + 
getDummycodeAgent().mapDcdColumnID(1));
-                               else 
-                                       sb.append(words[0]);
-                       else
-                               sb.append("0");
-                       
-                       for(i=1; i<words.length; i++) 
-                       {
-                               sb.append(_delimString);
-                               
-                               if ( words[i] != null ) 
-                                       if ( words[i].equals("") )
-                                               throw new 
DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
-                                       else 
-                                               sb.append(words[i]);
-                               else
-                                       sb.append("0");
-                       }
-               }
-               else 
-               {
-                       sb.append(words[0] != null ? words[0] : "0");
-                       for(i=1; i<words.length; i++) 
-                       {
-                               sb.append(_delimString);
-                               sb.append(words[i] != null ? words[i] : "0");
-                       }
-               }
-               
-               return sb.toString();
-       }
-
-       private Reader initOffsetsReader(JobConf job) throws IOException 
-       {
-               Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-               FileSystem fs = FileSystem.get(job);
-               Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
-               if ( files.length != 1 )
-                       throw new IOException("Expecting a single file under 
counters file: " + path.toString());
-               
-               Reader reader = new SequenceFile.Reader(fs, files[0], job);
-               
-               return reader;
-       }
-       
-       /**
-        * Function to generate custom file names (transform-part-.....) for
-        * mappers' output for ApplyTfCSV job. The idea is to find the index 
-        * of (thisfile, fileoffset) in the list of all offsets from the 
-        * counters/offsets file, which was generated from either GenTfMtdMR
-        * or AssignRowIDMR job.
-        * 
-        */
-       public String getPartFileID(JobConf job, long offset) throws IOException
-       {
-               Reader reader = initOffsetsReader(job);
-               
-               ByteWritable key=new ByteWritable();
-               OffsetCount value=new OffsetCount();
-               String thisFile = TfUtils.getPartFileName(job);
-               
-               int id = 0;
-               while (reader.next(key, value)) {
-                       if ( thisFile.equals(value.filename) && 
value.fileOffset == offset ) 
-                               break;
-                       id++;
-               }
-               reader.close();
-               
-               String sid = Integer.toString(id);
-               char[] carr = new char[5-sid.length()];
-               Arrays.fill(carr, '0');
-               String ret = (new String(carr)).concat(sid);
-               
-               return ret;
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.DataExpression;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.matrix.CSVReblockMR;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.utils.JSONHelper;
+
+
+@SuppressWarnings("deprecation")
+public class TfUtils implements Serializable{
+       
+       private static final long serialVersionUID = 526252850872633125L;
+
+       private OmitAgent _oa = null;
+       private MVImputeAgent _mia = null;
+       private RecodeAgent _ra = null; 
+       private BinAgent _ba = null;
+       private DummycodeAgent _da = null;
+       
+       private long _numRecordsInPartFile;             // Total number of 
records in the data file
+       private long _numValidRecords;                  // 
(_numRecordsInPartFile - #of omitted records)
+       private long _numTransformedRows;               // Number of rows after 
applying transformations
+       private long _numTransformedColumns;    // Number of columns after 
applying transformations
+
+       private String _headerLine = null;
+       private boolean _hasHeader;
+       private Pattern _delim = null;
+       private String _delimString = null;
+       private String[] _NAstrings = null;
+       private String[] _outputColumnNames = null;
+       private long _numInputCols = -1;
+       
+       private String _tfMtdDir = null;
+       private String _specFile = null;
+       private String _offsetFile = null;
+       private String _tmpDir = null;
+       private String _outputPath = null;
+       
+       protected static boolean checkValidInputFile(FileSystem fs, Path path, 
boolean err)
+                       throws IOException {
+               // check non-existing file
+               if (!fs.exists(path))
+                       if ( err )
+                               throw new IOException("File " + path.toString() 
+ " does not exist on HDFS/LFS.");
+                       else
+                               return false;
+
+               // check for empty file
+               if (MapReduceTool.isFileEmpty(fs, path.toString()))
+                       if ( err )
+                       throw new EOFException("Empty input file " + 
path.toString() + ".");
+                       else
+                               return false;
+               
+               return true;
+       }
+       
+       public static String getPartFileName(JobConf job) throws IOException {
+               FileSystem fs = FileSystem.get(job);
+               Path thisPath=new 
Path(job.get("map.input.file")).makeQualified(fs);
+               return thisPath.toString();
+       }
+       
+       public static boolean isPartFileWithHeader(JobConf job) throws 
IOException {
+               FileSystem fs = FileSystem.get(job);
+               
+               String thisfile=getPartFileName(job);
+               Path smallestFilePath=new 
Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
+               
+               if(thisfile.toString().equals(smallestFilePath.toString()))
+                       return true;
+               else
+                       return false;
+       }
+       
+       public static JSONObject readSpec(FileSystem fs, String specFile) 
throws IOException {
+               BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(new Path(specFile))));
+               JSONObject obj = JSONHelper.parse(br);
+               br.close();
+               return obj;
+       }
+       
+       /**
+        * Prepare NA strings so that they can be sent to workers via JobConf.
+        * A "dummy" string is added at the end to handle the case of empty 
strings.
+        * @param na
+        * @return
+        */
+       public static String prepNAStrings(String na) {
+               return na  + DataExpression.DELIM_NA_STRING_SEP + "dummy";
+       }
+       
+       public static String[] parseNAStrings(String na) 
+       {
+               if ( na == null )
+                       return null;
+               
+               String[] tmp = 
Pattern.compile(Pattern.quote(DataExpression.DELIM_NA_STRING_SEP)).split(na, 
-1);
+               return tmp; //Arrays.copyOf(tmp, tmp.length-1);
+       }
+       
+       public static String[] parseNAStrings(JobConf job) 
+       {
+               return 
parseNAStrings(job.get(MRJobConfiguration.TF_NA_STRINGS));
+       }
+       
+       private void createAgents(JSONObject spec) throws IOException, 
JSONException {
+               _oa = new OmitAgent(spec);
+               _mia = new MVImputeAgent(spec);
+               _ra = new RecodeAgent(spec);
+               _ba = new BinAgent(spec);
+               _da = new DummycodeAgent(spec, _numInputCols);
+       }
+       
+       public void setupAgents(OmitAgent oa, MVImputeAgent mia, RecodeAgent 
ra, BinAgent ba, DummycodeAgent da)  {
+               _oa = oa;
+               _mia = mia;
+               _ra = ra;
+               _ba = ba;
+               _da = da;
+       }
+       
+       private void parseColumnNames() {
+               _outputColumnNames = _delim.split(_headerLine, -1);
+               for(int i=0; i < _outputColumnNames.length; i++)
+                       _outputColumnNames[i] = 
UtilFunctions.unquote(_outputColumnNames[i]);
+       }
+       
+       private void init(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long numCols, String offsetFile, String 
tmpPath, String outputPath) throws IOException, JSONException
+       {
+               _numRecordsInPartFile = 0;
+               _numValidRecords = 0;
+               _numTransformedRows = 0;
+               _numTransformedColumns = 0;
+               
+               _headerLine = headerLine;
+               _hasHeader = hasHeader;
+               _delimString = delim;
+               _delim = Pattern.compile(Pattern.quote(delim));
+               _NAstrings = naStrings;
+               _numInputCols = numCols;
+               _offsetFile = offsetFile;
+               _tmpDir = tmpPath;
+               _outputPath = outputPath;
+               
+               parseColumnNames();             
+               createAgents(spec);
+       }
+       
+       public TfUtils(JobConf job, boolean minimal) 
+               throws IOException, JSONException 
+       {
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       ConfigurationManager.setCachedJobConf(job);
+               }
+               
+               _NAstrings = TfUtils.parseNAStrings(job);
+               _specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+               
+               FileSystem fs = FileSystem.get(job);
+               JSONObject spec = TfUtils.readSpec(fs, _specFile);
+               
+               _oa = new OmitAgent(spec);
+       }
+       
+       // called from GenTFMtdMapper, ApplyTf (Hadoop)
+       public TfUtils(JobConf job) 
+               throws IOException, JSONException 
+       {
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       ConfigurationManager.setCachedJobConf(job);
+               }
+               
+               boolean hasHeader = 
Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
+               //Pattern delim = 
Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
+               String[] naStrings = TfUtils.parseNAStrings(job);
+               
+               long numCols = UtilFunctions.parseToLong( 
job.get(MRJobConfiguration.TF_NUM_COLS) );            // #of columns in input 
data
+                       
+               String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+               String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
+               String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
+               String outputPath = 
FileOutputFormat.getOutputPath(job).toString();
+               FileSystem fs = FileSystem.get(job);
+               JSONObject spec = TfUtils.readSpec(fs, specFile);
+               
+               init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, 
job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, 
tmpPath, outputPath);
+       }
+       
+       // called from GenTfMtdReducer 
+       public TfUtils(JobConf job, String tfMtdDir) throws IOException, 
JSONException 
+       {
+               this(job);
+               _tfMtdDir = tfMtdDir;
+       }
+       
+       // called from GenTFMtdReducer and ApplyTf (Spark)
+       public TfUtils(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String 
offsetFile, String tmpPath) throws IOException, JSONException {
+               init (headerLine, hasHeader, delim, naStrings, spec, ncol, 
offsetFile, tmpPath, null);
+               _tfMtdDir = tfMtdDir;
+       }
+       
+       public void incrValid() { _numValidRecords++; }
+       public long getValid()  { return _numValidRecords; }
+       public long getTotal()  { return _numRecordsInPartFile; }
+       public long getNumTransformedRows()     { return _numTransformedRows; }
+       public long getNumTransformedColumns()  { return 
_numTransformedColumns; }
+       
+       public String getHeader()               { return _headerLine; }
+       public boolean hasHeader()              { return _hasHeader; }
+       public String getDelimString()  { return _delimString; }
+       public Pattern getDelim()               { return _delim; }
+       public String[] getNAStrings()  { return _NAstrings; }
+       public long getNumCols()                { return _numInputCols; }
+       
+       public String getSpecFile()     { return _specFile; }
+       public String getTfMtdDir()     { return _tfMtdDir; }
+       public String getOffsetFile()   { return _offsetFile; }
+       public String getTmpDir()               { return _tmpDir; }
+       public String getOutputPath()   { return _outputPath; }
+       
+       public String getName(int colID) { return _outputColumnNames[colID-1]; }
+       
+       public void setValid(long n) { _numValidRecords = n;}
+       public void incrTotal() { _numRecordsInPartFile++; }
+       public void setTotal(long n) { _numRecordsInPartFile = n;}
+       
+       public OmitAgent          getOmitAgent()        {       return _oa; }
+       public MVImputeAgent  getMVImputeAgent(){       return _mia;}
+       public RecodeAgent        getRecodeAgent()      {       return _ra; }
+       public BinAgent           getBinAgent()         {       return _ba; }
+       public DummycodeAgent getDummycodeAgent() { return _da; }
+       
+       /**
+        * Function that checks if the given string is one of NA strings.
+        * 
+        * @param w
+        * @return
+        */
+       public boolean isNA(String w) {
+               if(_NAstrings == null)
+                       return false;
+               
+               for(String na : _NAstrings) {
+                       if(w.equals(na))
+                               return true;
+               }
+               return false;
+       }
+       
+       public String[] getWords(Text line)
+       {
+               return getWords(line.toString());
+       }
+       
+
+       public String[] getWords(String line) 
+       {
+               return getDelim().split(line.trim(), -1);
+       }
+       
+       /**
+        * Process a given row to construct transformation metadata.
+        * 
+        * @param line
+        * @return
+        * @throws IOException
+        */
+       public String[] prepareTfMtd(String line) throws IOException {
+               String[] words = getWords(line);
+               if(!getOmitAgent().omit(words, this))
+               {
+                       getMVImputeAgent().prepare(words, this);
+                       getRecodeAgent().prepare(words, this);
+                       getBinAgent().prepare(words, this);
+                       incrValid();;
+               }
+               incrTotal();
+               
+               return words;
+       }
+       
+       public void loadTfMetadata() throws IOException 
+       {
+               JobConf job = ConfigurationManager.getCachedJobConf();
+               loadTfMetadata(job, false);
+       }
+       
+       public void loadTfMetadata(JobConf job, boolean fromLocalFS) throws 
IOException
+       {
+               Path tfMtdDir = null; 
+               FileSystem fs = null;
+               
+               if(fromLocalFS) {
+                       // metadata must be read from local file system (e.g., 
distributed cache in the case of Hadoop)
+                       tfMtdDir = 
(DistributedCache.getLocalCacheFiles(job))[0];
+                       fs = FileSystem.getLocal(job);
+               }
+               else {
+                       fs = FileSystem.get(job);
+                       tfMtdDir = new Path(getTfMtdDir());
+               }
+               
+               // load transformation metadata 
+               getMVImputeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+               getRecodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+               getBinAgent().loadTxMtd(job, fs, tfMtdDir, this);
+               
+               // associate recode maps and bin definitions with dummycoding 
agent,
+               // as recoded and binned columns are typically dummycoded
+               getDummycodeAgent().setRecodeMaps( 
getRecodeAgent().getRecodeMaps() );
+               getDummycodeAgent().setNumBins(getBinAgent().getBinList(), 
getBinAgent().getNumBins());
+               getDummycodeAgent().loadTxMtd(job, fs, tfMtdDir, this);
+
+       }
+       
+       /*public void loadTfMetadata () throws IOException
+       {
+               Path tfMtdDir = (DistributedCache.getLocalCacheFiles(_rJob))[0];
+               FileSystem localFS = FileSystem.getLocal(_rJob);
+               
+               loadTfMetadata(_rJob, localFS, tfMtdDir);
+               
+               FileSystem fs;
+               fs = FileSystem.get(_rJob);
+               Path thisPath=new 
Path(_rJob.get("map.input.file")).makeQualified(fs);
+               String thisfile=thisPath.toString();
+                       
+               Path smallestFilePath=new 
Path(_rJob.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
+               if(thisfile.toString().equals(smallestFilePath.toString()))
+                       _partFileWithHeader=true;
+               else
+                       _partFileWithHeader = false;
+       }*/
+
+
+       public String processHeaderLine() throws IOException 
+       {
+               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
+               String dcdHeader = 
getDummycodeAgent().constructDummycodedHeader(getHeader(), getDelim());
+               getDummycodeAgent().genDcdMapsAndColTypes(fs, getTmpDir(), 
(int) getNumCols(), this);
+               
+               // write header information (before and after transformation) 
to temporary path
+               // these files are copied into txMtdPath, once the ApplyTf job 
is complete.
+               DataTransform.generateHeaderFiles(fs, getTmpDir(), getHeader(), 
dcdHeader);
+
+               return dcdHeader;
+               //_numTransformedColumns = getDelim().split(dcdHeader, 
-1).length; 
+               //return _numTransformedColumns;
+       }
+
+       public boolean omit(String[] words) {
+               if(getOmitAgent() == null)
+                       return false;
+               return getOmitAgent().omit(words, this);
+       }
+       
+       
+       public String[] apply(String[] words) {
+               return apply(words, false);
+       }
+       
+       /**
+        * Function to apply transformation metadata on a given row.
+        * 
+        * @param words
+        * @param optimizeMaps
+        * @return
+        */
+       public String[] apply ( String[] words, boolean optimizeMaps ) 
+       {
+               words = getMVImputeAgent().apply(words, this);
+               
+               if(optimizeMaps)
+                       // specific case of transform() invoked from CP (to 
save boxing and unboxing)
+                       words = getRecodeAgent().cp_apply(words, this);
+               else
+                       words = getRecodeAgent().apply(words, this);
+
+               words = getBinAgent().apply(words, this);
+               words = getDummycodeAgent().apply(words, this);
+               
+               _numTransformedRows++;
+               
+               return words;
+       }
+       
+       public void check(String []words) throws DMLRuntimeException 
+       {
+               boolean checkEmptyString = ( getNAStrings() != null );
+               if ( checkEmptyString ) 
+               {
+                       final String msg = "When na.strings are provided, empty 
string \"\" is considered as a missing value, and it must be imputed 
appropriately. Encountered an unhandled empty string in column ID: ";
+                       for(int i=0; i<words.length; i++) 
+                               if ( words[i] != null && words[i].equals(""))
+                                       throw new DMLRuntimeException(msg + 
getDummycodeAgent().mapDcdColumnID(i+1));
+               }
+       }
+       
+       public String checkAndPrepOutputString(String []words) throws 
DMLRuntimeException 
+       {
+               return checkAndPrepOutputString(words, new StringBuilder());
+       }
+       
+       public String checkAndPrepOutputString(String []words, StringBuilder 
sb) throws DMLRuntimeException 
+       {
+               /*
+                * Check if empty strings ("") have to be handled.
+                * 
+                * Unless na.strings are provided, empty strings are 
(implicitly) considered as value zero.
+                * When na.strings are provided, then "" is considered a 
missing value indicator, and the 
+                * user is expected to provide an appropriate imputation 
method. Therefore, when na.strings 
+                * are provided, "" encountered in any column (after all 
transformations are applied) 
+                * denotes an erroneous condition.  
+                */
+               boolean checkEmptyString = ( getNAStrings() != null ); //&& 
!MVImputeAgent.isNA("", TransformationAgent.NAstrings) ) {
+               
+               //StringBuilder sb = new StringBuilder();
+               sb.setLength(0);
+               int i =0;
+               
+               if ( checkEmptyString ) 
+               {
+                       final String msg = "When na.strings are provided, empty 
string \"\" is considered as a missing value, and it must be imputed 
appropriately. Encountered an unhandled empty string in column ID: ";
+                       if ( words[0] != null ) 
+                               if ( words[0].equals("") )
+                                       throw new DMLRuntimeException( msg + 
getDummycodeAgent().mapDcdColumnID(1));
+                               else 
+                                       sb.append(words[0]);
+                       else
+                               sb.append("0");
+                       
+                       for(i=1; i<words.length; i++) 
+                       {
+                               sb.append(_delimString);
+                               
+                               if ( words[i] != null ) 
+                                       if ( words[i].equals("") )
+                                               throw new 
DMLRuntimeException(msg + getDummycodeAgent().mapDcdColumnID(i+1));
+                                       else 
+                                               sb.append(words[i]);
+                               else
+                                       sb.append("0");
+                       }
+               }
+               else 
+               {
+                       sb.append(words[0] != null ? words[0] : "0");
+                       for(i=1; i<words.length; i++) 
+                       {
+                               sb.append(_delimString);
+                               sb.append(words[i] != null ? words[i] : "0");
+                       }
+               }
+               
+               return sb.toString();
+       }
+
+       private Reader initOffsetsReader(JobConf job) throws IOException 
+       {
+               Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
+               FileSystem fs = FileSystem.get(job);
+               Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
+               if ( files.length != 1 )
+                       throw new IOException("Expecting a single file under 
counters file: " + path.toString());
+               
+               Reader reader = new SequenceFile.Reader(fs, files[0], job);
+               
+               return reader;
+       }
+       
+       /**
+        * Function to generate custom file names (transform-part-.....) for
+        * mappers' output for ApplyTfCSV job. The idea is to find the index 
+        * of (thisfile, fileoffset) in the list of all offsets from the 
+        * counters/offsets file, which was generated from either GenTfMtdMR
+        * or AssignRowIDMR job.
+        * 
+        */
+       public String getPartFileID(JobConf job, long offset) throws IOException
+       {
+               Reader reader = initOffsetsReader(job);
+               
+               ByteWritable key=new ByteWritable();
+               OffsetCount value=new OffsetCount();
+               String thisFile = TfUtils.getPartFileName(job);
+               
+               int id = 0;
+               while (reader.next(key, value)) {
+                       if ( thisFile.equals(value.filename) && 
value.fileOffset == offset ) 
+                               break;
+                       id++;
+               }
+               reader.close();
+               
+               String sid = Integer.toString(id);
+               char[] carr = new char[5-sid.length()];
+               Arrays.fill(carr, '0');
+               String ret = (new String(carr)).concat(sid);
+               
+               return ret;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
index e818089..2c5e37f 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TransformationAgent.java
@@ -1,93 +1,93 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-
-public abstract class TransformationAgent implements Serializable {
-       
-       private static final long serialVersionUID = -2995384194257356337L;
-       
-       public static enum TX_METHOD { 
-               IMPUTE ("impute"), 
-               RECODE ("recode"), 
-               BIN ("bin"), 
-               DUMMYCODE ("dummycode"), 
-               SCALE ("scale"),
-               OMIT ("omit"),
-               MVRCD ("mvrcd");
-               
-               private String _name;
-               
-               TX_METHOD(String name) { _name = name; }
-               
-               public String toString() {
-                       return _name;
-               }
-       }
-       
-       protected static String JSON_ATTRS      = "attributes"; 
-       protected static String JSON_MTHD       = "methods"; 
-       protected static String JSON_CONSTS = "constants"; 
-       protected static String JSON_NBINS      = "numbins"; 
-       
-       protected static final String MV_FILE_SUFFIX            = ".impute";
-       protected static final String RCD_MAP_FILE_SUFFIX       = ".map";
-       protected static final String NDISTINCT_FILE_SUFFIX = ".ndistinct";
-       protected static final String MODE_FILE_SUFFIX          = ".mode";
-       protected static final String BIN_FILE_SUFFIX           = ".bin";
-       protected static final String SCALE_FILE_SUFFIX         = ".scale";
-       protected static final String DCD_FILE_NAME             = 
"dummyCodeMaps.csv";
-       protected static final String COLTYPES_FILE_NAME        = 
"coltypes.csv";
-       
-       protected static final String TXMTD_SEP         = ",";
-       protected static final String DCD_NAME_SEP      = "_";
-       
-       protected static final String OUT_HEADER = "column.names";
-       protected static final String OUT_DCD_HEADER = 
"dummycoded.column.names";
-       
-       abstract public void print();
-       abstract public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException;
-       abstract public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
-       
-       abstract public void loadTxMtd(JobConf job, FileSystem fs, Path 
txMtdDir, TfUtils agents) throws IOException;
-       abstract public String[] apply(String[] words, TfUtils agents);
-       
-       protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, 
INVALID }
-       protected byte columnTypeToID(ColumnTypes type) throws IOException { 
-               switch(type) 
-               {
-               case SCALE: return 1;
-               case NOMINAL: return 2;
-               case ORDINAL: return 3;
-               case DUMMYCODED: return 1; // Ideally, dummycoded columns 
should be of a different type. Treating them as SCALE is incorrect, 
semantically.
-               default:
-                       throw new IOException("Invalid Column Type: " + type);
-               }
-       }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+
+public abstract class TransformationAgent implements Serializable {
+       
+       private static final long serialVersionUID = -2995384194257356337L;
+       
+       public static enum TX_METHOD { 
+               IMPUTE ("impute"), 
+               RECODE ("recode"), 
+               BIN ("bin"), 
+               DUMMYCODE ("dummycode"), 
+               SCALE ("scale"),
+               OMIT ("omit"),
+               MVRCD ("mvrcd");
+               
+               private String _name;
+               
+               TX_METHOD(String name) { _name = name; }
+               
+               public String toString() {
+                       return _name;
+               }
+       }
+       
+       protected static String JSON_ATTRS      = "attributes"; 
+       protected static String JSON_MTHD       = "methods"; 
+       protected static String JSON_CONSTS = "constants"; 
+       protected static String JSON_NBINS      = "numbins"; 
+       
+       protected static final String MV_FILE_SUFFIX            = ".impute";
+       protected static final String RCD_MAP_FILE_SUFFIX       = ".map";
+       protected static final String NDISTINCT_FILE_SUFFIX = ".ndistinct";
+       protected static final String MODE_FILE_SUFFIX          = ".mode";
+       protected static final String BIN_FILE_SUFFIX           = ".bin";
+       protected static final String SCALE_FILE_SUFFIX         = ".scale";
+       protected static final String DCD_FILE_NAME             = 
"dummyCodeMaps.csv";
+       protected static final String COLTYPES_FILE_NAME        = 
"coltypes.csv";
+       
+       protected static final String TXMTD_SEP         = ",";
+       protected static final String DCD_NAME_SEP      = "_";
+       
+       protected static final String OUT_HEADER = "column.names";
+       protected static final String OUT_DCD_HEADER = 
"dummycoded.column.names";
+       
+       abstract public void print();
+       abstract public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException;
+       abstract public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException;
+       
+       abstract public void loadTxMtd(JobConf job, FileSystem fs, Path 
txMtdDir, TfUtils agents) throws IOException;
+       abstract public String[] apply(String[] words, TfUtils agents);
+       
+       protected enum ColumnTypes { SCALE, NOMINAL, ORDINAL, DUMMYCODED, 
INVALID }
+       protected byte columnTypeToID(ColumnTypes type) throws IOException { 
+               switch(type) 
+               {
+               case SCALE: return 1;
+               case NOMINAL: return 2;
+               case ORDINAL: return 3;
+               case DUMMYCODED: return 1; // Ideally, dummycoded columns 
should be of a different type. Treating them as SCALE is incorrect, 
semantically.
+               default:
+                       throw new IOException("Invalid Column Type: " + type);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/standalone/runStandaloneSystemML.bat
----------------------------------------------------------------------
diff --git a/src/main/standalone/runStandaloneSystemML.bat 
b/src/main/standalone/runStandaloneSystemML.bat
index aba2002..f837970 100644
--- a/src/main/standalone/runStandaloneSystemML.bat
+++ b/src/main/standalone/runStandaloneSystemML.bat
@@ -1,50 +1,50 @@
-::-------------------------------------------------------------
-::
-:: Licensed to the Apache Software Foundation (ASF) under one
-:: or more contributor license agreements.  See the NOTICE file
-:: distributed with this work for additional information
-:: regarding copyright ownership.  The ASF licenses this file
-:: to you under the Apache License, Version 2.0 (the
-:: "License"); you may not use this file except in compliance
-:: with the License.  You may obtain a copy of the License at
-:: 
-::   http://www.apache.org/licenses/LICENSE-2.0
-:: 
-:: Unless required by applicable law or agreed to in writing,
-:: software distributed under the License is distributed on an
-:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-:: KIND, either express or implied.  See the License for the
-:: specific language governing permissions and limitations
-:: under the License.
-::
-::-------------------------------------------------------------
-
-@ECHO OFF
-
-IF "%~1" == ""  GOTO Err
-IF "%~1" == "-help" GOTO Msg
-IF "%~1" == "-h" GOTO Msg
-
-setLocal EnableDelayedExpansion
-
-SET HADOOP_HOME=%CD%/lib/hadoop
-
-set CLASSPATH=./lib/*
-echo !CLASSPATH!
-
-set LOG4JPROP=log4j.properties
-
-for /f "tokens=1,* delims= " %%a in ("%*") do set ALLBUTFIRST=%%b
-
-java -Xmx4g -Xms4g -Xmn400m -cp %CLASSPATH% 
-Dlog4j.configuration=file:%LOG4JPROP% org.apache.sysml.api.DMLScript -f %1 
-exec singlenode -config=SystemML-config.xml %ALLBUTFIRST%
-GOTO End
-
-:Err
-ECHO "Wrong Usage. Please provide DML filename to be executed."
-GOTO Msg
-
-:Msg
-ECHO "Usage: runStandaloneSystemML.bat <dml-filename> [arguments] [-help]"
-ECHO "Script internally invokes 'java -Xmx4g -Xms4g -Xmn400m -jar 
jSystemML.jar -f <dml-filename> -exec singlenode -config=SystemML-config.xml 
[Optional-Arguments]'"
-
-:End
+::-------------------------------------------------------------
+::
+:: Licensed to the Apache Software Foundation (ASF) under one
+:: or more contributor license agreements.  See the NOTICE file
+:: distributed with this work for additional information
+:: regarding copyright ownership.  The ASF licenses this file
+:: to you under the Apache License, Version 2.0 (the
+:: "License"); you may not use this file except in compliance
+:: with the License.  You may obtain a copy of the License at
+:: 
+::   http://www.apache.org/licenses/LICENSE-2.0
+:: 
+:: Unless required by applicable law or agreed to in writing,
+:: software distributed under the License is distributed on an
+:: "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+:: KIND, either express or implied.  See the License for the
+:: specific language governing permissions and limitations
+:: under the License.
+::
+::-------------------------------------------------------------
+
+@ECHO OFF
+
+IF "%~1" == ""  GOTO Err
+IF "%~1" == "-help" GOTO Msg
+IF "%~1" == "-h" GOTO Msg
+
+setLocal EnableDelayedExpansion
+
+SET HADOOP_HOME=%CD%/lib/hadoop
+
+set CLASSPATH=./lib/*
+echo !CLASSPATH!
+
+set LOG4JPROP=log4j.properties
+
+for /f "tokens=1,* delims= " %%a in ("%*") do set ALLBUTFIRST=%%b
+
+java -Xmx4g -Xms4g -Xmn400m -cp %CLASSPATH% 
-Dlog4j.configuration=file:%LOG4JPROP% org.apache.sysml.api.DMLScript -f %1 
-exec singlenode -config=SystemML-config.xml %ALLBUTFIRST%
+GOTO End
+
+:Err
+ECHO "Wrong Usage. Please provide DML filename to be executed."
+GOTO Msg
+
+:Msg
+ECHO "Usage: runStandaloneSystemML.bat <dml-filename> [arguments] [-help]"
+ECHO "Script internally invokes 'java -Xmx4g -Xms4g -Xmn400m -jar 
jSystemML.jar -f <dml-filename> -exec singlenode -config=SystemML-config.xml 
[Optional-Arguments]'"
+
+:End

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/test/scripts/applications/apply-transform/apply-transform.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/apply-transform/apply-transform.dml 
b/src/test/scripts/applications/apply-transform/apply-transform.dml
index de7fa02..fdd85c7 100644
--- a/src/test/scripts/applications/apply-transform/apply-transform.dml
+++ b/src/test/scripts/applications/apply-transform/apply-transform.dml
@@ -1,156 +1,156 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
-cmdLine_bin_defns = ifdef($bin_defns, " ")
-cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
-cmdLine_normalization_maps = ifdef($normalization_maps, " ")
-
-original_X = read($X)
-
-if(cmdLine_missing_value_maps != " "){
-       missing_val_maps = read(cmdLine_missing_value_maps)
-
-       last_data_col = ncol(original_X)-nrow(missing_val_maps)
-       X = original_X[,1:last_data_col]
-}else
-       X = original_X
-
-# col 1: col index of missing indicator col
-#               0 otherwise
-# col 2: global mean if imputation is needed
-# col 3: num_bins if binning is required
-# col 4: bin width if binning is required
-# col 5: min val if binning is required
-# col 6: begin col if dummy coding is required
-# col 7: end col if dummy coding is required
-# col 8: 1 if normalization is required 0 ow
-# col 9: mean for normalization
-# col 10: std for z-scoring for normalization
-#               -1 indicates mean subtraction  
-attrinfo = matrix(0, rows=ncol(X), cols=10)
-
-if(cmdLine_missing_value_maps != " "){
-       missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
-       
-       parfor(i in 1:nrow(missing_val_maps), check=0){
-               attr_index_mv = castAsScalar(missing_val_maps[i,1])
-               attrinfo[attr_index_mv,1] = i
-               attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
-       }       
-}
-       
-if(cmdLine_bin_defns != " "){
-       bin_defns = read(cmdLine_bin_defns)
-       parfor(i in 1:nrow(bin_defns), check=0){
-               attr_index_bin = castAsScalar(bin_defns[i,1])
-               attrinfo[attr_index_bin,3] = bin_defns[i,4]
-               attrinfo[attr_index_bin,4] = bin_defns[i,2]
-               attrinfo[attr_index_bin,5] = bin_defns[i,3]
-       }
-}
-
-if(cmdLine_dummy_code_maps != " "){
-       dummy_code_maps = read(cmdLine_dummy_code_maps)
-       parfor(i in 1:nrow(dummy_code_maps), check=0){
-               attr_index_dc = castAsScalar(dummy_code_maps[i,1])
-               attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
-               attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
-       }
-}else{
-       attrinfo[,6] = seq(1, ncol(X), 1)
-       attrinfo[,7] = seq(1, ncol(X), 1)
-}
-
-if(cmdLine_normalization_maps != " "){
-       normalization_map = read(cmdLine_normalization_maps)
-       parfor(i in 1:nrow(normalization_map), check=0){
-               attr_index_normalization = castAsScalar(normalization_map[i,1])
-               attrinfo[attr_index_normalization,8] = 1
-               attrinfo[attr_index_normalization,9] = 
castAsScalar(normalization_map[i,2])
-               attrinfo[attr_index_normalization,10] = 
castAsScalar(normalization_map[i,3])
-       }
-}
-
-#write(attrinfo, "binning/attrinfo.mtx", format="csv")
-
-cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
-new_X = matrix(0, rows=nrow(X), cols=cols_in_transformed_X)
-log = matrix(0, rows=ncol(X), cols=2)
-parfor(i in 1:ncol(X), check=0){
-       col = X[,i]
-       
-       mv_col_id = castAsScalar(attrinfo[i,1])
-       global_mean = castAsScalar(attrinfo[i,2])
-       num_bins = castAsScalar(attrinfo[i,3])
-       bin_width = castAsScalar(attrinfo[i,4])
-       min_val = castAsScalar(attrinfo[i,5])
-       dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
-       dummy_coding_end_col = castAsScalar(attrinfo[i,7])
-       normalization_needed = castAsScalar(attrinfo[i,8])
-       normalization_mean = castAsScalar(attrinfo[i,9])
-       normalization_std = castAsScalar(attrinfo[i,10])
-       
-       if(mv_col_id > 0){ 
-               # fill-in with global mean
-               col = col + missing_indicator_mat[,mv_col_id] * global_mean
-       }
-       
-       if(num_bins > 0){
-               # only for equiwidth bins
-       
-               # note that max_val entries will get assigned num_bins+1
-               col = round((col - min_val)/bin_width - 0.5) + 1
-               less_than_lb = ppred(col, 1, "<")
-               more_than_ub = ppred(col, num_bins, ">")
-               
-               col = (1 - less_than_lb - more_than_ub)*col + 
more_than_ub*num_bins + less_than_lb
-       }
-
-       if(dummy_coding_beg_col == dummy_coding_end_col){
-               if(normalization_needed == 1){
-                       if(normalization_std == -1) col = col - 
normalization_mean
-                       else col = (col - normalization_mean)/normalization_std
-               }
-               
-               new_X[,dummy_coding_beg_col] = col
-       }else{
-               min_val = min(col)
-               max_val = max(col)
-               if(min_val >= 1 & max_val <= dummy_coding_end_col - 
dummy_coding_beg_col + 1){
-                       res = table(seq(1, nrow(X), 1), col, nrow(X), 
(dummy_coding_end_col-dummy_coding_beg_col+1))
-                       new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
-               }else{
-                       log[i,1] = 1
-                       if(min_val < 1) log[i,2] = min_val
-                       else log[i,2] = max_val
-               }
-       }
-}
-
-write(new_X, $transformed_X, format="text")
-
-s = "Warning Messages"
-for(i in 1:nrow(log)){
-       if(castAsScalar(log[i,1]) == 1)
-               s = append(s, "Unseen value in column " + i + " (" + 
castAsScalar(log[i,2]) + ")")
-}
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
+cmdLine_bin_defns = ifdef($bin_defns, " ")
+cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
+cmdLine_normalization_maps = ifdef($normalization_maps, " ")
+
+original_X = read($X)
+
+if(cmdLine_missing_value_maps != " "){
+       missing_val_maps = read(cmdLine_missing_value_maps)
+
+       last_data_col = ncol(original_X)-nrow(missing_val_maps)
+       X = original_X[,1:last_data_col]
+}else
+       X = original_X
+
+# col 1: col index of missing indicator col
+#               0 otherwise
+# col 2: global mean if imputation is needed
+# col 3: num_bins if binning is required
+# col 4: bin width if binning is required
+# col 5: min val if binning is required
+# col 6: begin col if dummy coding is required
+# col 7: end col if dummy coding is required
+# col 8: 1 if normalization is required 0 ow
+# col 9: mean for normalization
+# col 10: std for z-scoring for normalization
+#               -1 indicates mean subtraction  
+attrinfo = matrix(0, rows=ncol(X), cols=10)
+
+if(cmdLine_missing_value_maps != " "){
+       missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
+       
+       parfor(i in 1:nrow(missing_val_maps), check=0){
+               attr_index_mv = castAsScalar(missing_val_maps[i,1])
+               attrinfo[attr_index_mv,1] = i
+               attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
+       }       
+}
+       
+if(cmdLine_bin_defns != " "){
+       bin_defns = read(cmdLine_bin_defns)
+       parfor(i in 1:nrow(bin_defns), check=0){
+               attr_index_bin = castAsScalar(bin_defns[i,1])
+               attrinfo[attr_index_bin,3] = bin_defns[i,4]
+               attrinfo[attr_index_bin,4] = bin_defns[i,2]
+               attrinfo[attr_index_bin,5] = bin_defns[i,3]
+       }
+}
+
+if(cmdLine_dummy_code_maps != " "){
+       dummy_code_maps = read(cmdLine_dummy_code_maps)
+       parfor(i in 1:nrow(dummy_code_maps), check=0){
+               attr_index_dc = castAsScalar(dummy_code_maps[i,1])
+               attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
+               attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
+       }
+}else{
+       attrinfo[,6] = seq(1, ncol(X), 1)
+       attrinfo[,7] = seq(1, ncol(X), 1)
+}
+
+if(cmdLine_normalization_maps != " "){
+       normalization_map = read(cmdLine_normalization_maps)
+       parfor(i in 1:nrow(normalization_map), check=0){
+               attr_index_normalization = castAsScalar(normalization_map[i,1])
+               attrinfo[attr_index_normalization,8] = 1
+               attrinfo[attr_index_normalization,9] = 
castAsScalar(normalization_map[i,2])
+               attrinfo[attr_index_normalization,10] = 
castAsScalar(normalization_map[i,3])
+       }
+}
+
+#write(attrinfo, "binning/attrinfo.mtx", format="csv")
+
+cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
+new_X = matrix(0, rows=nrow(X), cols=cols_in_transformed_X)
+log = matrix(0, rows=ncol(X), cols=2)
+parfor(i in 1:ncol(X), check=0){
+       col = X[,i]
+       
+       mv_col_id = castAsScalar(attrinfo[i,1])
+       global_mean = castAsScalar(attrinfo[i,2])
+       num_bins = castAsScalar(attrinfo[i,3])
+       bin_width = castAsScalar(attrinfo[i,4])
+       min_val = castAsScalar(attrinfo[i,5])
+       dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
+       dummy_coding_end_col = castAsScalar(attrinfo[i,7])
+       normalization_needed = castAsScalar(attrinfo[i,8])
+       normalization_mean = castAsScalar(attrinfo[i,9])
+       normalization_std = castAsScalar(attrinfo[i,10])
+       
+       if(mv_col_id > 0){ 
+               # fill-in with global mean
+               col = col + missing_indicator_mat[,mv_col_id] * global_mean
+       }
+       
+       if(num_bins > 0){
+               # only for equiwidth bins
+       
+               # note that max_val entries will get assigned num_bins+1
+               col = round((col - min_val)/bin_width - 0.5) + 1
+               less_than_lb = ppred(col, 1, "<")
+               more_than_ub = ppred(col, num_bins, ">")
+               
+               col = (1 - less_than_lb - more_than_ub)*col + 
more_than_ub*num_bins + less_than_lb
+       }
+
+       if(dummy_coding_beg_col == dummy_coding_end_col){
+               if(normalization_needed == 1){
+                       if(normalization_std == -1) col = col - 
normalization_mean
+                       else col = (col - normalization_mean)/normalization_std
+               }
+               
+               new_X[,dummy_coding_beg_col] = col
+       }else{
+               min_val = min(col)
+               max_val = max(col)
+               if(min_val >= 1 & max_val <= dummy_coding_end_col - 
dummy_coding_beg_col + 1){
+                       res = table(seq(1, nrow(X), 1), col, nrow(X), 
(dummy_coding_end_col-dummy_coding_beg_col+1))
+                       new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
+               }else{
+                       log[i,1] = 1
+                       if(min_val < 1) log[i,2] = min_val
+                       else log[i,2] = max_val
+               }
+       }
+}
+
+write(new_X, $transformed_X, format="text")
+
+s = "Warning Messages"
+for(i in 1:nrow(log)){
+       if(castAsScalar(log[i,1]) == 1)
+               s = append(s, "Unseen value in column " + i + " (" + 
castAsScalar(log[i,2]) + ")")
+}
 write(s, $Log)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/test/scripts/applications/apply-transform/apply-transform.pydml
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/applications/apply-transform/apply-transform.pydml 
b/src/test/scripts/applications/apply-transform/apply-transform.pydml
index be04495..f6c40dd 100644
--- a/src/test/scripts/applications/apply-transform/apply-transform.pydml
+++ b/src/test/scripts/applications/apply-transform/apply-transform.pydml
@@ -1,146 +1,146 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#   http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
-cmdLine_bin_defns = ifdef($bin_defns, " ")
-cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
-cmdLine_normalization_maps = ifdef($normalization_maps, " ")
-
-original_X = load($X)
-
-if(cmdLine_missing_value_maps != " "):
-    missing_val_maps = read(cmdLine_missing_value_maps)
-
-    last_data_col = ncol(original_X)-nrow(missing_val_maps)
-    X = original_X[,1:last_data_col]
-else:
-    X = original_X
-
-# col 1: col index of missing indicator col
-#         0 otherwise
-# col 2: global mean if imputation is needed
-# col 3: num_bins if binning is required
-# col 4: bin width if binning is required
-# col 5: min val if binning is required
-# col 6: begin col if dummy coding is required
-# col 7: end col if dummy coding is required
-# col 8: 1 if normalization is required 0 ow
-# col 9: mean for normalization
-# col 10: std for z-scoring for normalization
-#         -1 indicates mean subtraction  
-attrinfo = full(0, rows=ncol(X), cols=10)
-
-if(cmdLine_missing_value_maps != " "):
-    missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
-    
-    parfor(i in 1:nrow(missing_val_maps), check=0):
-        attr_index_mv = castAsScalar(missing_val_maps[i,1])
-        attrinfo[attr_index_mv,1] = i
-        attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
-    
-if(cmdLine_bin_defns != " "):
-    bin_defns = read(cmdLine_bin_defns)
-    parfor(i in 1:nrow(bin_defns), check=0):
-        attr_index_bin = castAsScalar(bin_defns[i,1])
-        attrinfo[attr_index_bin,3] = bin_defns[i,4]
-        attrinfo[attr_index_bin,4] = bin_defns[i,2]
-        attrinfo[attr_index_bin,5] = bin_defns[i,3]
-
-if(cmdLine_dummy_code_maps != " "):
-    dummy_code_maps = read(cmdLine_dummy_code_maps)
-    parfor(i in 1:nrow(dummy_code_maps), check=0):
-        attr_index_dc = castAsScalar(dummy_code_maps[i,1])
-        attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
-        attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
-else:
-    attrinfo[,6] = seq(1, ncol(X), 1)
-    attrinfo[,7] = seq(1, ncol(X), 1)
-
-if(cmdLine_normalization_maps != " "):
-    normalization_map = read(cmdLine_normalization_maps)
-    parfor(i in 1:nrow(normalization_map), check=0):
-        attr_index_normalization = castAsScalar(normalization_map[i,1])
-        attrinfo[attr_index_normalization,8] = 1
-        attrinfo[attr_index_normalization,9] = 
castAsScalar(normalization_map[i,2])
-        attrinfo[attr_index_normalization,10] = 
castAsScalar(normalization_map[i,3])
-
-#write(attrinfo, "binning/attrinfo.mtx", format="csv")
-
-cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
-new_X = full(0, rows=nrow(X), cols=cols_in_transformed_X)
-log = full(0, rows=ncol(X), cols=2)
-parfor(i in 1:ncol(X), check=0):
-    col = X[,i]
-    
-    mv_col_id = castAsScalar(attrinfo[i,1])
-    global_mean = castAsScalar(attrinfo[i,2])
-    num_bins = castAsScalar(attrinfo[i,3])
-    bin_width = castAsScalar(attrinfo[i,4])
-    min_val = castAsScalar(attrinfo[i,5])
-    dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
-    dummy_coding_end_col = castAsScalar(attrinfo[i,7])
-    normalization_needed = castAsScalar(attrinfo[i,8])
-    normalization_mean = castAsScalar(attrinfo[i,9])
-    normalization_std = castAsScalar(attrinfo[i,10])
-    
-    if(mv_col_id > 0):
-        # fill-in with global mean
-        col = col + missing_indicator_mat[,mv_col_id] * global_mean
-    
-    if(num_bins > 0):
-        # only for equiwidth bins
-    
-        # note that max_val entries will get assigned num_bins+1
-        col = round((col - min_val)/bin_width - 0.5) + 1
-        less_than_lb = ppred(col, 1, "<")
-        more_than_ub = ppred(col, num_bins, ">")
-        
-        col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + 
less_than_lb
-
-    if(dummy_coding_beg_col == dummy_coding_end_col):
-        if(normalization_needed == 1):
-            if(normalization_std == -1):
-                col = col - normalization_mean
-            else:
-                col = (col - normalization_mean)/normalization_std
-        
-        new_X[,dummy_coding_beg_col] = col
-    else:
-        min_val = min(col)
-        max_val = max(col)
-        if(min_val >= 1 & max_val <= dummy_coding_end_col - 
dummy_coding_beg_col + 1):
-            res = table(seq(1, nrow(X), 1), col, nrow(X), 
(dummy_coding_end_col-dummy_coding_beg_col+1))
-            new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
-        else:
-            log[i,1] = 1
-            if(min_val < 1):
-                log[i,2] = min_val
-            else:
-                log[i,2] = max_val
-
-save(new_X, $transformed_X, format="text")
-
-s = "Warning Messages"
-for(i in 1:nrow(log)):
-    if(castAsScalar(log[i,1]) == 1):
-        s = append(s, "Unseen value in column " + i + " (" + 
castAsScalar(log[i,2]) + ")")
-
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+cmdLine_missing_value_maps = ifdef($missing_value_maps, " ")
+cmdLine_bin_defns = ifdef($bin_defns, " ")
+cmdLine_dummy_code_maps = ifdef($dummy_code_maps, " ")
+cmdLine_normalization_maps = ifdef($normalization_maps, " ")
+
+original_X = load($X)
+
+if(cmdLine_missing_value_maps != " "):
+    missing_val_maps = read(cmdLine_missing_value_maps)
+
+    last_data_col = ncol(original_X)-nrow(missing_val_maps)
+    X = original_X[,1:last_data_col]
+else:
+    X = original_X
+
+# col 1: col index of missing indicator col
+#         0 otherwise
+# col 2: global mean if imputation is needed
+# col 3: num_bins if binning is required
+# col 4: bin width if binning is required
+# col 5: min val if binning is required
+# col 6: begin col if dummy coding is required
+# col 7: end col if dummy coding is required
+# col 8: 1 if normalization is required 0 ow
+# col 9: mean for normalization
+# col 10: std for z-scoring for normalization
+#         -1 indicates mean subtraction  
+attrinfo = full(0, rows=ncol(X), cols=10)
+
+if(cmdLine_missing_value_maps != " "):
+    missing_indicator_mat = original_X[,(last_data_col+1):ncol(original_X)]
+    
+    parfor(i in 1:nrow(missing_val_maps), check=0):
+        attr_index_mv = castAsScalar(missing_val_maps[i,1])
+        attrinfo[attr_index_mv,1] = i
+        attrinfo[attr_index_mv,2] = missing_val_maps[i,2]
+    
+if(cmdLine_bin_defns != " "):
+    bin_defns = read(cmdLine_bin_defns)
+    parfor(i in 1:nrow(bin_defns), check=0):
+        attr_index_bin = castAsScalar(bin_defns[i,1])
+        attrinfo[attr_index_bin,3] = bin_defns[i,4]
+        attrinfo[attr_index_bin,4] = bin_defns[i,2]
+        attrinfo[attr_index_bin,5] = bin_defns[i,3]
+
+if(cmdLine_dummy_code_maps != " "):
+    dummy_code_maps = read(cmdLine_dummy_code_maps)
+    parfor(i in 1:nrow(dummy_code_maps), check=0):
+        attr_index_dc = castAsScalar(dummy_code_maps[i,1])
+        attrinfo[attr_index_dc,6] = dummy_code_maps[i,2]
+        attrinfo[attr_index_dc,7] = dummy_code_maps[i,3]
+else:
+    attrinfo[,6] = seq(1, ncol(X), 1)
+    attrinfo[,7] = seq(1, ncol(X), 1)
+
+if(cmdLine_normalization_maps != " "):
+    normalization_map = read(cmdLine_normalization_maps)
+    parfor(i in 1:nrow(normalization_map), check=0):
+        attr_index_normalization = castAsScalar(normalization_map[i,1])
+        attrinfo[attr_index_normalization,8] = 1
+        attrinfo[attr_index_normalization,9] = 
castAsScalar(normalization_map[i,2])
+        attrinfo[attr_index_normalization,10] = 
castAsScalar(normalization_map[i,3])
+
+#write(attrinfo, "binning/attrinfo.mtx", format="csv")
+
+cols_in_transformed_X = castAsScalar(attrinfo[nrow(attrinfo),6])
+new_X = full(0, rows=nrow(X), cols=cols_in_transformed_X)
+log = full(0, rows=ncol(X), cols=2)
+parfor(i in 1:ncol(X), check=0):
+    col = X[,i]
+    
+    mv_col_id = castAsScalar(attrinfo[i,1])
+    global_mean = castAsScalar(attrinfo[i,2])
+    num_bins = castAsScalar(attrinfo[i,3])
+    bin_width = castAsScalar(attrinfo[i,4])
+    min_val = castAsScalar(attrinfo[i,5])
+    dummy_coding_beg_col = castAsScalar(attrinfo[i,6])
+    dummy_coding_end_col = castAsScalar(attrinfo[i,7])
+    normalization_needed = castAsScalar(attrinfo[i,8])
+    normalization_mean = castAsScalar(attrinfo[i,9])
+    normalization_std = castAsScalar(attrinfo[i,10])
+    
+    if(mv_col_id > 0):
+        # fill-in with global mean
+        col = col + missing_indicator_mat[,mv_col_id] * global_mean
+    
+    if(num_bins > 0):
+        # only for equiwidth bins
+    
+        # note that max_val entries will get assigned num_bins+1
+        col = round((col - min_val)/bin_width - 0.5) + 1
+        less_than_lb = ppred(col, 1, "<")
+        more_than_ub = ppred(col, num_bins, ">")
+        
+        col = (1 - less_than_lb - more_than_ub)*col + more_than_ub*num_bins + 
less_than_lb
+
+    if(dummy_coding_beg_col == dummy_coding_end_col):
+        if(normalization_needed == 1):
+            if(normalization_std == -1):
+                col = col - normalization_mean
+            else:
+                col = (col - normalization_mean)/normalization_std
+        
+        new_X[,dummy_coding_beg_col] = col
+    else:
+        min_val = min(col)
+        max_val = max(col)
+        if(min_val >= 1 & max_val <= dummy_coding_end_col - 
dummy_coding_beg_col + 1):
+            res = table(seq(1, nrow(X), 1), col, nrow(X), 
(dummy_coding_end_col-dummy_coding_beg_col+1))
+            new_X[,dummy_coding_beg_col:dummy_coding_end_col] = res
+        else:
+            log[i,1] = 1
+            if(min_val < 1):
+                log[i,2] = min_val
+            else:
+                log[i,2] = max_val
+
+save(new_X, $transformed_X, format="text")
+
+s = "Warning Messages"
+for(i in 1:nrow(log)):
+    if(castAsScalar(log[i,1]) == 1):
+        s = append(s, "Unseen value in column " + i + " (" + 
castAsScalar(log[i,2]) + ")")
+
 save(s, $Log)
\ No newline at end of file

Reply via email to