http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java 
b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
deleted file mode 100644
index 4a7839d..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ /dev/null
@@ -1,1496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.conf.DMLConfig;
-import org.apache.sysml.lops.CSVReBlock;
-import org.apache.sysml.lops.Lop;
-import org.apache.sysml.lops.LopProperties.ExecType;
-import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
-import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
-import org.apache.sysml.runtime.instructions.InstructionParser;
-import org.apache.sysml.runtime.instructions.MRJobInstruction;
-import 
org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.AssignRowIDMRReturn;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.util.MapReduceTool;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.sysml.utils.JSONHelper;
-
-public class DataTransform 
-{
-       private static final String ERROR_MSG_ZERO_ROWS = "Number of rows in 
the transformed output (potentially, after ommitting the ones with missing 
values) is zero. Cannot proceed.";
-
-       
-       /**
-        * Method to read the header line from the input data file.
-        * 
-        * @param fs file system
-        * @param prop csv file format properties
-        * @param smallestFile file name
-        * @return header line
-        * @throws IOException if IOException occurs
-        */
-       private static String readHeaderLine(FileSystem fs, 
CSVFileFormatProperties prop, String smallestFile) throws IOException {
-               String line = null;
-               try( BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(new Path(smallestFile)))) ) {
-                       line = br.readLine();
-               }
-               if(prop.hasHeader()) {
-                       ; // nothing here
-               }
-               else 
-               {
-                       // construct header with default column names, V1, V2, 
etc.
-                       int ncol = Pattern.compile( 
Pattern.quote(prop.getDelim()) ).split(line, -1).length;
-                       line = null;
-                       
-                       StringBuilder sb = new StringBuilder();
-                       sb.append("V1");
-                       for(int i=2; i <= ncol; i++)
-                               sb.append(prop.getDelim() + "V" + i);
-                       line = sb.toString();
-               }
-               return line;
-       }
-       
-       /**
-        * Method to construct a mapping between column names and their
-        * corresponding column IDs. The mapping is used to prepare the
-        * specification file in <code>processSpecFile()</code>.
-        * 
-        * @param fs file system
-        * @param prop csv file format properties
-        * @param headerLine header line
-        * @param smallestFile file name
-        * @return map of column names and their column IDs
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws IOException if IOException occurs
-        */
-       private static HashMap<String, Integer> processColumnNames(FileSystem 
fs, CSVFileFormatProperties prop, String headerLine, String smallestFile) 
throws IllegalArgumentException, IOException {
-               HashMap<String, Integer> colNames = new 
HashMap<String,Integer>();
-               
-               String escapedDelim = Pattern.quote(prop.getDelim());
-               Pattern compiledDelim = Pattern.compile(escapedDelim);
-               String[] names = compiledDelim.split(headerLine, -1);
-                       
-               for(int i=0; i< names.length; i++)
-                       colNames.put(UtilFunctions.unquote(names[i].trim()), 
i+1);
-
-               return colNames;
-       }
-       
-       /**
-        * In-place permutation of list, mthd, and cst arrays based on indices,
-        * by navigating through cycles in the permutation. 
-        * 
-        * @param list ?
-        * @param mthd ?
-        * @param cst ?
-        * @param indices ?
-        */
-       private static void inplacePermute(int[] list, byte[] mthd, Object[] 
cst, Integer[] indices) 
-       {
-               int x;
-               byte xb = 0;
-               Object xo = null;
-               
-               int j, k;
-               for(int i=0; i < list.length; i++) 
-               {
-                   x = list[i];
-                   xb = mthd[i];
-                   if ( cst != null )  xo = cst[i];
-                   
-                   j = i;
-                   while(true) {
-                       k = indices[j];
-                       indices[j] = j;
-                       
-                       if (k == i)
-                           break;
-                       
-                       list[j] = list[k];
-                       mthd[j] = mthd[k]; 
-                       if ( cst != null )  cst[j] = cst[k]; 
-                       j = k;
-                   }
-                   list[j] = x;
-               mthd[j] = xb; 
-               if ( cst != null )  cst[j] = xo; 
-               }
-
-       }
-       
-       /**
-        * Convert input transformation specification file with column names 
into a
-        * specification with corresponding column Ids. This file is sent to 
all the
-        * relevant MR jobs.
-        * 
-        * @param fs file system
-        * @param inputPath input file path
-        * @param smallestFile file name
-        * @param colNames column names
-        * @param prop csv file format properties
-        * @param specFileWithNames ?
-        * @return specification as a JSONObject
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws IOException if IOException occurs
-        * @throws JSONException if JSONException occurs
-        */
-       private static String processSpecFile(FileSystem fs, String inputPath, 
String smallestFile, HashMap<String,Integer> colNames, CSVFileFormatProperties 
prop, String specWithNames) throws IllegalArgumentException, IOException, 
JSONException {
-               JSONObject inputSpec = new JSONObject(specWithNames);
-               
-               final String NAME = "name";
-               final String ID = "id";
-               final String METHOD = "method";
-               final String VALUE = "value";
-               final String MV_METHOD_MEAN = "global_mean";
-               final String MV_METHOD_MODE = "global_mode";
-               final String MV_METHOD_CONSTANT = "constant";
-               final String BIN_METHOD_WIDTH = "equi-width";
-               final String BIN_METHOD_HEIGHT = "equi-height";
-               final String SCALE_METHOD_Z = "z-score";
-               final String SCALE_METHOD_M = "mean-subtraction";
-               final String JSON_BYPOS = "ids";
-               
-               String stmp = null;
-               JSONObject entry = null;
-               byte btmp = 0;
-               
-               final int[] mvList;
-               int[] rcdList, dcdList, omitList;
-               final int[] binList;
-               final int[] scaleList;
-               byte[] mvMethods = null, binMethods=null, scaleMethods=null;
-               Object[] numBins = null;
-               Object[] mvConstants = null;
-               
-               boolean byPositions = (inputSpec.containsKey(JSON_BYPOS) && 
((Boolean)inputSpec.get(JSON_BYPOS)).booleanValue() == true);
-               
-               // 
--------------------------------------------------------------------------
-               // Omit
-               if( inputSpec.containsKey(TfUtils.TXMETHOD_OMIT) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_OMIT);
-                       omitList = new int[arrtmp.size()];
-                       for(int i=0; i<arrtmp.size(); i++) {
-                               if(byPositions)
-                                       omitList[i] = UtilFunctions.toInt( 
arrtmp.get(i) );
-                               else {
-                                       stmp = UtilFunctions.unquote( 
(String)arrtmp.get(i) );
-                                       omitList[i] = colNames.get(stmp);
-                               }
-                       }
-                       Arrays.sort(omitList);
-               }
-               else
-                       omitList = null;
-               // 
--------------------------------------------------------------------------
-               // Missing value imputation
-               if( inputSpec.containsKey(TfUtils.TXMETHOD_IMPUTE) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_IMPUTE);
-                       
-                       mvList = new int[arrtmp.size()];
-                       mvMethods = new byte[arrtmp.size()];
-                       mvConstants = new Object[arrtmp.size()];
-                       
-                       for(int i=0; i<arrtmp.size(); i++) {
-                               entry = (JSONObject)arrtmp.get(i);
-                               if (byPositions) {
-                                       mvList[i] = 
UtilFunctions.toInt(entry.get(ID));
-                               }
-                               else {
-                                       stmp = UtilFunctions.unquote((String) 
entry.get(NAME));
-                                       mvList[i] = colNames.get(stmp);
-                               }
-                               
-                               stmp = UtilFunctions.unquote((String) 
entry.get(METHOD));
-                               if(stmp.equals(MV_METHOD_MEAN))
-                                       btmp = (byte)1;
-                               else if ( stmp.equals(MV_METHOD_MODE))
-                                       btmp = (byte)2;
-                               else if ( stmp.equals(MV_METHOD_CONSTANT))
-                                       btmp = (byte)3;
-                               else
-                                       throw new IOException("Unknown missing 
value imputation method (" + stmp + ") in transformation specification: " + 
specWithNames);
-                               mvMethods[i] = btmp;
-                               
-                               //txMethods.add( btmp );
-                               
-                               mvConstants[i] = null;
-                               if ( entry.containsKey(VALUE) )
-                                       mvConstants[i] = entry.get(VALUE);
-                       }
-                       
-                       Integer[] idx = new Integer[mvList.length];
-                       for(int i=0; i < mvList.length; i++)
-                               idx[i] = i;
-                       Arrays.sort(idx, new Comparator<Integer>() {
-                               @Override
-                               public int compare(Integer o1, Integer o2) {
-                                       return (mvList[o1]-mvList[o2]);
-                               }
-                       });
-                       
-                       // rearrange mvList, mvMethods, and mvConstants 
according to permutation idx
-                       inplacePermute(mvList, mvMethods, mvConstants, idx);
-               }
-               else
-                       mvList = null;
-               // 
--------------------------------------------------------------------------
-               // Recoding
-               if( inputSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_RECODE);
-                       rcdList = new int[arrtmp.size()];
-                       for(int i=0; i<arrtmp.size(); i++) {
-                               if (byPositions)
-                                       rcdList[i] = 
UtilFunctions.toInt(arrtmp.get(i));
-                               else {
-                                       stmp = UtilFunctions.unquote( 
(String)arrtmp.get(i) );
-                                       rcdList[i] = colNames.get(stmp);
-                               }
-                       }
-                       Arrays.sort(rcdList);
-               }
-               else
-                       rcdList = null;
-               // 
--------------------------------------------------------------------------
-               // Binning
-               if( inputSpec.containsKey(TfUtils.TXMETHOD_BIN) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_BIN);
-                       
-                       binList = new int[arrtmp.size()];
-                       binMethods = new byte[arrtmp.size()];
-                       numBins = new Object[arrtmp.size()];
-                       
-                       for(int i=0; i<arrtmp.size(); i++) {
-                               entry = (JSONObject)arrtmp.get(i);
-                               
-                               if (byPositions) {
-                                       binList[i] = 
UtilFunctions.toInt(entry.get(ID));
-                               }
-                               else {
-                                       stmp = UtilFunctions.unquote((String) 
entry.get(NAME));
-                                       binList[i] = colNames.get(stmp);
-                               }
-                               stmp = UtilFunctions.unquote((String) 
entry.get(METHOD));
-                               if(stmp.equals(BIN_METHOD_WIDTH))
-                                       btmp = (byte)1;
-                               else if ( stmp.equals(BIN_METHOD_HEIGHT))
-                                       throw new IOException("Equi-height 
binning method is not yet supported, in transformation specification: " + 
specWithNames);
-                               else
-                                       throw new IOException("Unknown missing 
value imputation method (" + stmp + ") in transformation specification: " + 
specWithNames);
-                               binMethods[i] = btmp;
-                               
-                               numBins[i] = entry.get(TfUtils.JSON_NBINS);
-                               if ( ((Integer) numBins[i]).intValue() <= 1 ) 
-                                       throw new 
IllegalArgumentException("Invalid transformation on column \"" + (String) 
entry.get(NAME) + "\". Number of bins must be greater than 1.");
-                       }
-                       
-                       Integer[] idx = new Integer[binList.length];
-                       for(int i=0; i < binList.length; i++)
-                               idx[i] = i;
-                       Arrays.sort(idx, new Comparator<Integer>() {
-                               @Override
-                               public int compare(Integer o1, Integer o2) {
-                                       return (binList[o1]-binList[o2]);
-                               }
-                       });
-                       
-                       // rearrange binList and binMethods according to 
permutation idx
-                       inplacePermute(binList, binMethods, numBins, idx);
-               }
-               else
-                       binList = null;
-               // 
--------------------------------------------------------------------------
-               // Dummycoding
-               if( inputSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_DUMMYCODE);
-                       dcdList = new int[arrtmp.size()];
-                       for(int i=0; i<arrtmp.size(); i++) {
-                               if (byPositions)
-                                       dcdList[i] = 
UtilFunctions.toInt(arrtmp.get(i));
-                               else {
-                                       stmp = UtilFunctions.unquote( 
(String)arrtmp.get(i) );
-                                       dcdList[i] = colNames.get(stmp);
-                               }
-                       }
-                       Arrays.sort(dcdList);
-               }
-               else
-                       dcdList = null;
-               // 
--------------------------------------------------------------------------
-               // Scaling
-               if(inputSpec.containsKey(TfUtils.TXMETHOD_SCALE) ) {
-                       JSONArray arrtmp = (JSONArray) 
inputSpec.get(TfUtils.TXMETHOD_SCALE);
-                       
-                       scaleList = new int[arrtmp.size()];
-                       scaleMethods = new byte[arrtmp.size()];
-                       
-                       for(int i=0; i<arrtmp.size(); i++) {
-                               entry = (JSONObject)arrtmp.get(i);
-                               
-                               if (byPositions) {
-                                       scaleList[i] = 
UtilFunctions.toInt(entry.get(ID));
-                               }
-                               else {
-                                       stmp = UtilFunctions.unquote((String) 
entry.get(NAME));
-                                       scaleList[i] = colNames.get(stmp);
-                               }
-                               stmp = UtilFunctions.unquote((String) 
entry.get(METHOD));
-                               if(stmp.equals(SCALE_METHOD_M))
-                                       btmp = (byte)1;
-                               else if ( stmp.equals(SCALE_METHOD_Z))
-                                       btmp = (byte)2;
-                               else
-                                       throw new IOException("Unknown missing 
value imputation method (" + stmp + ") in transformation specification: " + 
specWithNames);
-                               scaleMethods[i] = btmp;
-                       }
-                       
-                       Integer[] idx = new Integer[scaleList.length];
-                       for(int i=0; i < scaleList.length; i++)
-                               idx[i] = i;
-                       Arrays.sort(idx, new Comparator<Integer>() {
-                               @Override
-                               public int compare(Integer o1, Integer o2) {
-                                       return (scaleList[o1]-scaleList[o2]);
-                               }
-                       });
-                       
-                       // rearrange scaleList and scaleMethods according to 
permutation idx
-                       inplacePermute(scaleList, scaleMethods, null, idx);
-               }
-               else
-                       scaleList = null;
-               // 
--------------------------------------------------------------------------
-               
-               // check for column IDs that are imputed with mode, but not 
recoded
-               // These columns have be handled separately, because the 
computation of mode 
-               // requires the computation of distinct values (i.e., recode 
maps)
-               ArrayList<Integer> tmpList = new ArrayList<Integer>();
-               if(mvList != null)
-               for(int i=0; i < mvList.length; i++) {
-                       int colID = mvList[i];
-                       if(mvMethods[i] == 2 && (rcdList == null || 
Arrays.binarySearch(rcdList, colID) < 0) )
-                               tmpList.add(colID);
-               }
-               
-               int[] mvrcdList = null;
-               if ( tmpList.size() > 0 ) {
-                       mvrcdList = new int[tmpList.size()];
-                       for(int i=0; i < tmpList.size(); i++)
-                               mvrcdList[i] = tmpList.get(i);
-               }
-               // Perform Validity Checks
-               
-               /*
-                             OMIT MVI RCD BIN DCD SCL
-                       OMIT     -  x   *   *   *   *
-                       MVI      x  -   *   *   *   *
-                       RCD      *  *   -   x   *   x
-                       BIN      *  *   x   -   *   x
-                       DCD      *  *   *   *   -   x
-                       SCL      *  *   x   x   x   -
-                */
-               
-               if(mvList != null)
-                       for(int i=0; i < mvList.length; i++) 
-                       {
-                               int colID = mvList[i];
-
-                               if ( omitList != null && 
Arrays.binarySearch(omitList, colID) >= 0 ) 
-                                       throw new 
IllegalArgumentException("Invalid transformations on column ID " + colID + ". A 
column can not be both omitted and imputed.");
-                               
-                               if(mvMethods[i] == 1) 
-                               {
-                                       if ( rcdList != null && 
Arrays.binarySearch(rcdList, colID) >= 0 ) 
-                                               throw new 
IllegalArgumentException("Invalid transformations on column ID " + colID + ". A 
numeric column can not be recoded.");
-                                       
-                                       if ( dcdList != null && 
Arrays.binarySearch(dcdList, colID) >= 0 )
-                                               // throw an error only if the 
column is not binned
-                                               if ( binList == null || 
Arrays.binarySearch(binList, colID) < 0 )
-                                                       throw new 
IllegalArgumentException("Invalid transformations on column ID " + colID + ". A 
numeric column can not be dummycoded.");
-                               }
-                       }
-               
-               if(scaleList != null)
-               for(int i=0; i < scaleList.length; i++) 
-               {
-                       int colID = scaleList[i];
-                       if ( rcdList != null && Arrays.binarySearch(rcdList, 
colID) >= 0 ) 
-                               throw new IllegalArgumentException("Invalid 
transformations on column ID " + colID + ". A column can not be recoded and 
scaled.");
-                       if ( binList != null && Arrays.binarySearch(binList, 
colID) >= 0 ) 
-                               throw new IllegalArgumentException("Invalid 
transformations on column ID " + colID + ". A column can not be binned and 
scaled.");
-                       if ( dcdList != null && Arrays.binarySearch(dcdList, 
colID) >= 0 ) 
-                               throw new IllegalArgumentException("Invalid 
transformations on column ID " + colID + ". A column can not be dummycoded and 
scaled.");
-               }
-               
-               if(rcdList != null)
-               for(int i=0; i < rcdList.length; i++) 
-               {
-                       int colID = rcdList[i];
-                       if ( binList != null && Arrays.binarySearch(binList, 
colID) >= 0 ) 
-                               throw new IllegalArgumentException("Invalid 
transformations on column ID " + colID + ". A column can not be recoded and 
binned.");
-               }
-               
-               // Check if dummycoded columns are either recoded or binned.
-               // If not, add them to recode list.
-               ArrayList<Integer> addToRcd = new ArrayList<Integer>();
-               if(dcdList != null)
-               for(int i=0; i < dcdList.length; i++) 
-               {
-                       int colID = dcdList[i];
-                       boolean isRecoded = (rcdList != null && 
Arrays.binarySearch(rcdList, colID) >= 0);
-                       boolean isBinned = (binList != null && 
Arrays.binarySearch(binList, colID) >= 0);
-                       // If colID is neither recoded nor binned, then, add it 
to rcdList.
-                       if ( !isRecoded && !isBinned )
-                               addToRcd.add(colID);
-               }
-               if ( addToRcd.size() > 0 ) 
-               {
-                       int[] newRcdList = null;
-                       if ( rcdList != null)  
-                               newRcdList = Arrays.copyOf(rcdList, 
rcdList.length + addToRcd.size());
-                       else
-                               newRcdList = new int[addToRcd.size()];
-                       
-                       int i = (rcdList != null ? rcdList.length : 0);
-                       for(int idx=0; i < newRcdList.length; i++, idx++)
-                               newRcdList[i] = addToRcd.get(idx);
-                       Arrays.sort(newRcdList);
-                       rcdList = newRcdList;
-               }
-               // 
-----------------------------------------------------------------------------
-               
-               // Prepare output spec
-               JSONObject outputSpec = new JSONObject();
-
-               if (omitList != null)
-               {
-                       JSONObject rcdSpec = new JSONObject();
-                       rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(omitList));
-                       outputSpec.put(TfUtils.TXMETHOD_OMIT, rcdSpec);
-               }
-               
-               if (mvList != null)
-               {
-                       JSONObject mvSpec = new JSONObject();
-                       mvSpec.put(TfUtils.JSON_ATTRS, toJSONArray(mvList));
-                       mvSpec.put(TfUtils.JSON_MTHD, toJSONArray(mvMethods));
-                       mvSpec.put(TfUtils.JSON_CONSTS, 
toJSONArray(mvConstants));
-                       outputSpec.put(TfUtils.TXMETHOD_IMPUTE, mvSpec);
-               }
-               
-               if (rcdList != null)
-               {
-                       JSONObject rcdSpec = new JSONObject();
-                       rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(rcdList));
-                       outputSpec.put(TfUtils.TXMETHOD_RECODE, rcdSpec);
-               }
-               
-               if (binList != null)
-               {
-                       JSONObject binSpec = new JSONObject();
-                       binSpec.put(TfUtils.JSON_ATTRS, toJSONArray(binList));
-                       binSpec.put(TfUtils.JSON_MTHD, toJSONArray(binMethods));
-                       binSpec.put(TfUtils.JSON_NBINS, toJSONArray(numBins));
-                       outputSpec.put(TfUtils.TXMETHOD_BIN, binSpec);
-               }
-               
-               if (dcdList != null)
-               {
-                       JSONObject dcdSpec = new JSONObject();
-                       dcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(dcdList));
-                       outputSpec.put(TfUtils.TXMETHOD_DUMMYCODE, dcdSpec);
-               }
-               
-               if (scaleList != null)
-               {
-                       JSONObject scaleSpec = new JSONObject();
-                       scaleSpec.put(TfUtils.JSON_ATTRS, 
toJSONArray(scaleList));
-                       scaleSpec.put(TfUtils.JSON_MTHD, 
toJSONArray(scaleMethods));
-                       outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec);
-               }
-               
-               if (mvrcdList != null)
-               {
-                       JSONObject mvrcd = new JSONObject();
-                       mvrcd.put(TfUtils.JSON_ATTRS, toJSONArray(mvrcdList));
-                       outputSpec.put(TfUtils.TXMETHOD_MVRCD, mvrcd);
-               }
-               
-               // return output spec with IDs
-               return outputSpec.toString();
-       }
-       
-       private static JSONArray toJSONArray(int[] list) 
-       {
-               JSONArray ret = new JSONArray(list.length);
-               for(int i=0; i < list.length; i++)
-                       ret.add(list[i]);
-               return ret;
-       }
-
-       private static JSONArray toJSONArray(byte[] list) 
-       {
-               JSONArray ret = new JSONArray(list.length);
-               for(int i=0; i < list.length; i++)
-                       ret.add(list[i]);
-               return ret;
-       }
-
-       private static JSONArray toJSONArray(Object[] list) 
-               throws JSONException 
-       {
-               return new JSONArray(list);
-       }
-       
-       /**
-        * Helper function to move transformation metadata files from a 
temporary
-        * location to permanent location. These files (e.g., header before and
-        * after transformation) are generated by a single mapper, while 
applying
-        * data transformations. Note that, these files must be ultimately be 
placed
-        * under the existing metadata directory (txMtdPath), which is
-        * simultaneously read by other mappers. If they are not created at a
-        * temporary location, then MR tasks fail due to changing timestamps on
-        * txMtdPath.
-        * 
-        * @param fs file system
-        * @param tmpPath temporary location (directory) of transformation 
metadata files
-        * @param txMtdPath directory where to place transformation metadata 
files
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws IOException if IOException occurs
-        */
-       private static void moveFilesFromTmp(FileSystem fs, String tmpPath, 
String txMtdPath) throws IllegalArgumentException, IOException 
-       {
-               // move files from temporary location to txMtdPath
-               MapReduceTool.renameFileOnHDFS(tmpPath + "/" + 
TfUtils.TXMTD_COLNAMES, txMtdPath + "/" + TfUtils.TXMTD_COLNAMES);
-               MapReduceTool.renameFileOnHDFS(tmpPath + "/" + 
TfUtils.TXMTD_DC_COLNAMES, txMtdPath + "/" + TfUtils.TXMTD_DC_COLNAMES);
-               MapReduceTool.renameFileOnHDFS(tmpPath + "/" + 
TfUtils.TXMTD_COLTYPES, txMtdPath + "/" + TfUtils.TXMTD_COLTYPES);
-               
-               if ( fs.exists(new Path(tmpPath +"/Dummycode/" + 
TfUtils.DCD_FILE_NAME)) ) 
-               {
-                       if ( !fs.exists( new Path(txMtdPath + "/Dummycode/") )) 
-                               fs.mkdirs(new Path(txMtdPath + "/Dummycode/"));
-                       MapReduceTool.renameFileOnHDFS( tmpPath + "/Dummycode/" 
+ TfUtils.DCD_FILE_NAME, txMtdPath + "/Dummycode/" + TfUtils.DCD_FILE_NAME);
-               }
-       }
-       
-       /**
-        * Helper function to determine the number of columns after applying
-        * transformations. Note that dummycoding changes the number of columns.
-        * 
-        * @param fs file system
-        * @param header header line
-        * @param delim delimiter
-        * @param tfMtdPath transform metadata path
-        * @return number of columns after applying transformations
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws IOException if IOException occurs
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        * @throws JSONException  if JSONException occurs
-        */
-       private static int getNumColumnsTf(FileSystem fs, String header, String 
delim, String tfMtdPath) throws IllegalArgumentException, IOException, 
DMLRuntimeException, JSONException {
-               String[] columnNames = 
Pattern.compile(Pattern.quote(delim)).split(header, -1);
-               int ret = columnNames.length;
-               
-               JSONObject spec = null;
-               try(BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(new Path(tfMtdPath + "/spec.json"))))) {
-                       spec = JSONHelper.parse(br);
-               }
-               
-               // fetch relevant attribute lists
-               if ( !spec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) )
-                       return ret;
-               
-               JSONArray dcdList = (JSONArray) 
((JSONObject)spec.get(TfUtils.TXMETHOD_DUMMYCODE)).get(TfUtils.JSON_ATTRS);
-
-               // look for numBins among binned columns
-               for(Object o : dcdList) 
-               {
-                       int id = UtilFunctions.toInt(o);
-                       
-                       Path binpath = new Path( tfMtdPath + "/Bin/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
-                       Path rcdpath = new Path( tfMtdPath + "/Recode/" + 
UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX);
-                       
-                       if ( TfUtils.checkValidInputFile(fs, binpath, false ) )
-                       {
-                               int nbins = -1;
-                               try( BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(binpath)))) {
-                                       nbins = 
UtilFunctions.parseToInt(br.readLine().split(TfUtils.TXMTD_SEP)[4]);
-                               }
-                               ret += (nbins-1);
-                       }
-                       else if ( TfUtils.checkValidInputFile(fs, rcdpath, 
false ) )
-                       {
-                               int ndistinct = -1;
-                               try( BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(rcdpath))) ) {
-                                       ndistinct = 
UtilFunctions.parseToInt(br.readLine());
-                               }
-                               ret += (ndistinct-1);
-                       }
-                       else
-                               throw new DMLRuntimeException("Relevant 
transformation metadata for column (id=" + id + ", name=" + columnNames[id-1] + 
") is not found.");
-               }
-               
-               return ret;
-       }
-       
-       /**
-        * Main method to create and/or apply transformation metdata using 
MapReduce.
-        * 
-        * @param jobinst MR job instruction
-        * @param inputs array of input matrices
-        * @param shuffleInst shuffle instructions
-        * @param otherInst other instructions
-        * @param resultIndices byte array of result indices
-        * @param outputs array of output matrices
-        * @param numReducers number of reducers
-        * @param replication ?
-        * @return MR job result
-        * @throws Exception if IOException occurs
-        */
-       public static JobReturn mrDataTransform(MRJobInstruction jobinst, 
MatrixObject[] inputs, String shuffleInst, String otherInst, byte[] 
resultIndices, MatrixObject[] outputs, int numReducers, int replication) throws 
Exception {
-               
-               String[] insts = 
shuffleInst.split(Instruction.INSTRUCTION_DELIM);
-               
-               // Parse transform instruction (the first instruction) to 
obtain relevant fields
-               TransformOperands oprnds = new TransformOperands(insts[0], 
inputs[0]);
-               
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               
-               // find the first file in alphabetical ordering of part files 
in directory inputPath 
-               String smallestFile = CSVReblockMR.findSmallestFile(job, 
oprnds.inputPath);
-               
-               // find column names
-               FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile);
-               String headerLine = readHeaderLine(fs, 
oprnds.inputCSVProperties, smallestFile);
-               HashMap<String, Integer> colNamesToIds = processColumnNames(fs, 
oprnds.inputCSVProperties, headerLine, smallestFile);
-               String outHeader = getOutputHeader(fs, headerLine, oprnds);
-               int numColumns = colNamesToIds.size();
-               
-               int numColumnsTf = 0;
-               long numRowsTf = 0;
-               
-               ArrayList<Integer> csvoutputs= new ArrayList<Integer>();
-               ArrayList<Integer> bboutputs = new ArrayList<Integer>();
-               
-               // divide output objects based on output format (CSV or 
BinaryBlock)
-               for(int i=0; i < outputs.length; i++) 
-               {
-                       if(outputs[i].getFileFormatProperties() != null 
-                                       && 
outputs[i].getFileFormatProperties().getFileFormat() == 
FileFormatProperties.FileFormat.CSV)
-                               csvoutputs.add(i);
-                       else
-                               bboutputs.add(i);
-               }
-               boolean isCSV = (csvoutputs.size() > 0);
-               boolean isBB  = (bboutputs.size()  > 0);
-               String tmpPath = 
MRJobConfiguration.constructTempOutputFilename();
-               
-               checkIfOutputOverlapsWithTxMtd(outputs, oprnds, isCSV, isBB, 
csvoutputs, bboutputs, fs);
-               
-               JobReturn retCSV = null, retBB = null;
-               
-               if (!oprnds.isApply) {
-                       // build specification file with column IDs insteadof 
column names
-                       String specWithIDs = processSpecFile(fs, 
oprnds.inputPath, 
-                                                       smallestFile, 
colNamesToIds, oprnds.inputCSVProperties, oprnds.spec);
-                       colNamesToIds = null; // enable GC on colNamesToIds
-
-                       // Build transformation metadata, including recode 
maps, bin definitions, etc.
-                       // Also, generate part offsets file (counters file), 
which is to be used in csv-reblock
-                       
-                       String partOffsetsFile =  
MRJobConfiguration.constructTempOutputFilename();
-                       numRowsTf = GenTfMtdMR.runJob(oprnds.inputPath, 
oprnds.txMtdPath, specWithIDs, smallestFile, 
-                                       partOffsetsFile, 
oprnds.inputCSVProperties, numColumns, replication, outHeader);
-                       
-                       if ( numRowsTf == 0 )
-                               throw new 
DMLRuntimeException(ERROR_MSG_ZERO_ROWS);
-                       
-                       // store the specFileWithIDs as transformation metadata
-                       MapReduceTool.writeStringToHDFS(specWithIDs, 
oprnds.txMtdPath + "/" + "spec.json");
-                       
-                       numColumnsTf = getNumColumnsTf(fs, outHeader, 
oprnds.inputCSVProperties.getDelim(), oprnds.txMtdPath);
-                       
-                       // Apply transformation metadata, and perform actual 
transformation 
-                       if(isCSV)
-                               retCSV = ApplyTfCSVMR.runJob(oprnds.inputPath, 
specWithIDs, oprnds.txMtdPath, tmpPath, 
-                                       
outputs[csvoutputs.get(0)].getFileName(), partOffsetsFile, 
-                                       oprnds.inputCSVProperties, numColumns, 
replication, outHeader);
-                       
-                       if(isBB)
-                       {
-                               DMLConfig conf = 
ConfigurationManager.getDMLConfig();
-                               int blockSize = 
conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE);
-                               CSVReblockInstruction rblk = 
prepDummyReblockInstruction(oprnds.inputCSVProperties, blockSize);
-                               
-                               AssignRowIDMRReturn ret1 = 
CSVReblockMR.runAssignRowIDMRJob(new String[]{oprnds.inputPath}, 
-                                                       new 
InputInfo[]{InputInfo.CSVInputInfo}, new int[]{blockSize}, new 
int[]{blockSize}, 
-                                                       rblk.toString(), 
replication, new String[]{smallestFile}, true, 
-                                                       
oprnds.inputCSVProperties.getNAStrings(), specWithIDs);
-                               if ( ret1.rlens[0] == 0 )
-                                       throw new 
DMLRuntimeException(ERROR_MSG_ZERO_ROWS);
-                                       
-                               retBB = ApplyTfBBMR.runJob(oprnds.inputPath, 
insts[1], otherInst, 
-                                                       specWithIDs, 
oprnds.txMtdPath, tmpPath, outputs[bboutputs.get(0)].getFileName(), 
-                                                       
ret1.counterFile.toString(), oprnds.inputCSVProperties, numRowsTf, numColumns, 
-                                                       numColumnsTf, 
replication, outHeader);
-                       }
-                       
-                       MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(partOffsetsFile), job);
-                               
-               }
-               else {
-                       colNamesToIds = null; // enable GC on colNamesToIds
-                       
-                       // copy given transform metadata (applyTxPath) to 
specified location (txMtdPath)
-                       MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(oprnds.txMtdPath), job);
-                       MapReduceTool.copyFileOnHDFS(oprnds.applyTxPath, 
oprnds.txMtdPath);
-                       
-                       // path to specification file
-                       String specWithIDs = (oprnds.spec != null) ? 
oprnds.spec : 
-                               
MapReduceTool.readStringFromHDFSFile(oprnds.txMtdPath + "/" + "spec.json");
-                       numColumnsTf = getNumColumnsTf(fs, outHeader, 
-                                                                               
                oprnds.inputCSVProperties.getDelim(), 
-                                                                               
                oprnds.txMtdPath);
-                       
-                       if (isCSV) 
-                       {
-                               DMLConfig conf = 
ConfigurationManager.getDMLConfig();
-                               int blockSize = 
conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE);
-                               CSVReblockInstruction rblk = 
prepDummyReblockInstruction(oprnds.inputCSVProperties, blockSize);
-                               
-                               AssignRowIDMRReturn ret1 = 
CSVReblockMR.runAssignRowIDMRJob(new String[]{oprnds.inputPath}, 
-                                                       new 
InputInfo[]{InputInfo.CSVInputInfo}, new int[]{blockSize}, new 
int[]{blockSize}, 
-                                                       rblk.toString(), 
replication, new String[]{smallestFile}, true, 
-                                                       
oprnds.inputCSVProperties.getNAStrings(), specWithIDs);
-                               numRowsTf = ret1.rlens[0];
-                               
-                               if ( ret1.rlens[0] == 0 )
-                                       throw new 
DMLRuntimeException(ERROR_MSG_ZERO_ROWS);
-                                       
-                               // Apply transformation metadata, and perform 
actual transformation 
-                               retCSV = ApplyTfCSVMR.runJob(oprnds.inputPath, 
specWithIDs, oprnds.applyTxPath, tmpPath, 
-                                                       
outputs[csvoutputs.get(0)].getFileName(), ret1.counterFile.toString(), 
-                                                       
oprnds.inputCSVProperties, numColumns, replication, outHeader);
-                       }
-                       
-                       if(isBB) 
-                       {
-                               // compute part offsets file
-                               CSVReblockInstruction rblk = 
(CSVReblockInstruction) InstructionParser.parseSingleInstruction(insts[1]);
-                               CSVReblockInstruction newrblk = 
(CSVReblockInstruction) rblk.clone((byte)0);
-                               AssignRowIDMRReturn ret1 = 
CSVReblockMR.runAssignRowIDMRJob(new String[]{oprnds.inputPath}, 
-                                                       new 
InputInfo[]{InputInfo.CSVInputInfo}, new int[]{newrblk.brlen}, new 
int[]{newrblk.bclen}, 
-                                                       newrblk.toString(), 
replication, new String[]{smallestFile}, true, 
-                                                       
oprnds.inputCSVProperties.getNAStrings(), specWithIDs);
-                               numRowsTf = ret1.rlens[0];
-                               
-                               if ( ret1.rlens[0] == 0 )
-                                       throw new 
DMLRuntimeException(ERROR_MSG_ZERO_ROWS);
-                               
-                               // apply transformation metadata, as well as 
reblock the resulting data
-                               retBB = ApplyTfBBMR.runJob(oprnds.inputPath, 
insts[1], otherInst, specWithIDs, 
-                                                       oprnds.txMtdPath, 
tmpPath, outputs[bboutputs.get(0)].getFileName(), 
-                                                       
ret1.counterFile.toString(), oprnds.inputCSVProperties, ret1.rlens[0], 
-                                                       ret1.clens[0], 
numColumnsTf, replication, outHeader);
-                       }
-               }
-               
-               // copy auxiliary data (old and new header lines) from 
temporary location to txMtdPath
-               moveFilesFromTmp(fs, tmpPath, oprnds.txMtdPath);
-
-               // generate matrix metadata file for outputs
-               if ( retCSV != null ) 
-               {
-                       
retCSV.getMatrixCharacteristics(0).setDimension(numRowsTf, numColumnsTf);
-                       
-                       CSVFileFormatProperties prop = new 
CSVFileFormatProperties(
-                                                                               
                false, 
-                                                                               
                oprnds.inputCSVProperties.getDelim(), // use the same header as 
the input
-                                                                               
                false, Double.NaN, null);
-                       
-                       MapReduceTool.writeMetaDataFile 
(outputs[csvoutputs.get(0)].getFileName()+".mtd", 
-                                                                               
                ValueType.DOUBLE, retCSV.getMatrixCharacteristics(0), 
-                                                                               
                OutputInfo.CSVOutputInfo, prop);
-                       return retCSV;
-               }
-
-               if ( retBB != null )
-               {
-                       
retBB.getMatrixCharacteristics(0).setDimension(numRowsTf, numColumnsTf);
-                       
-                       MapReduceTool.writeMetaDataFile 
(outputs[bboutputs.get(0)].getFileName()+".mtd", 
-                                       ValueType.DOUBLE, 
retBB.getMatrixCharacteristics(0), OutputInfo.BinaryBlockOutputInfo);
-                       return retBB;
-               }
-               
-               return null;
-                       
-       }
-       
-       private static CSVReblockInstruction 
prepDummyReblockInstruction(CSVFileFormatProperties prop, int blockSize) {
-               StringBuilder sb = new StringBuilder();
-               sb.append( ExecType.MR );
-               
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( CSVReBlock.OPCODE );
-               
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( "0" );
-               sb.append(Lop.DATATYPE_PREFIX);
-               sb.append(DataType.MATRIX);
-               sb.append(Lop.VALUETYPE_PREFIX);
-               sb.append(ValueType.DOUBLE);
-               
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( "1" );
-               sb.append(Lop.DATATYPE_PREFIX);
-               sb.append(DataType.MATRIX);
-               sb.append(Lop.VALUETYPE_PREFIX);
-               sb.append(ValueType.DOUBLE);
-               
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( blockSize );
-               
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( blockSize );
-               
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( prop.hasHeader() );
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( prop.getDelim() );
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( prop.isFill() );
-               sb.append( Lop.OPERAND_DELIMITOR );
-               sb.append( prop.getFillValue() );
-
-               return (CSVReblockInstruction) 
CSVReblockInstruction.parseInstruction(sb.toString());
-       }
-
-       private static String getOutputHeader(FileSystem fs, String headerLine, 
TransformOperands oprnds) throws IOException
-       {
-               String ret = null;
-               
-               if(oprnds.isApply)
-               {
-                       try( BufferedReader br = new BufferedReader(new 
InputStreamReader( fs.open(new Path(oprnds.applyTxPath + "/" + 
TfUtils.TXMTD_COLNAMES)) )) ) {
-                               ret = br.readLine();
-                       }
-               }
-               else {
-                       if ( oprnds.outNamesFile == null )
-                               ret = headerLine;
-                       else {
-                               try( BufferedReader br = new BufferedReader(new 
InputStreamReader( fs.open(new Path(oprnds.outNamesFile)) )) ) {
-                                       ret = br.readLine();
-                               }
-                       }
-               }
-               
-               return ret;
-       }
-       
-       /**
-        * Main method to create and/or apply transformation metdata in-memory, 
on a
-        * single node.
-        * 
-        * @param inst parameterized built-in CP instruction
-        * @param inputs array of input data
-        * @param outputs array of output matrices
-        * @return MR job result
-        * @throws IOException if IOException occurs
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws JSONException if JSONException occurs
-        */
-       public static JobReturn 
cpDataTransform(ParameterizedBuiltinCPInstruction inst, CacheableData<?>[] 
inputs, MatrixObject[] outputs) throws IOException, DMLRuntimeException, 
IllegalArgumentException, JSONException {
-               TransformOperands oprnds = new 
TransformOperands(inst.getParameterMap(), inputs[0]);
-               return cpDataTransform(oprnds, inputs, outputs);
-       }
-
-       public static JobReturn cpDataTransform(String inst, CacheableData<?>[] 
inputs, MatrixObject[] outputs) 
-               throws IOException, DMLRuntimeException, 
IllegalArgumentException, JSONException 
-       {
-               String[] insts = inst.split(Instruction.INSTRUCTION_DELIM);
-               // Parse transform instruction (the first instruction) to 
obtain relevant fields
-               TransformOperands oprnds = new TransformOperands(insts[0], 
inputs[0]);
-               
-               return cpDataTransform(oprnds, inputs, outputs);
-       }
-               
-       public static JobReturn cpDataTransform(TransformOperands oprnds, 
CacheableData<?>[] inputs, MatrixObject[] outputs) 
-               throws IOException, DMLRuntimeException, 
IllegalArgumentException, JSONException 
-       {
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               
-               // find the first file in alphabetical ordering of partfiles in 
directory inputPath 
-               String smallestFile = CSVReblockMR.findSmallestFile(job, 
oprnds.inputPath);
-               FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile);
-               
-               // find column names
-               String headerLine = readHeaderLine(fs, 
oprnds.inputCSVProperties, smallestFile);
-               HashMap<String, Integer> colNamesToIds = processColumnNames(fs, 
oprnds.inputCSVProperties, headerLine, smallestFile);
-               String outHeader = getOutputHeader(fs, headerLine, oprnds);
-               
-               ArrayList<Integer> csvoutputs= new ArrayList<Integer>();
-               ArrayList<Integer> bboutputs = new ArrayList<Integer>();
-               
-               // divide output objects based on output format (CSV or 
BinaryBlock)
-               for(int i=0; i < outputs.length; i++) 
-               {
-                       if(outputs[i].getFileFormatProperties() != null && 
outputs[i].getFileFormatProperties().getFileFormat() == 
FileFormatProperties.FileFormat.CSV)
-                               csvoutputs.add(i);
-                       else
-                               bboutputs.add(i);
-               }
-               boolean isCSV = (csvoutputs.size() > 0);
-               boolean isBB  = (bboutputs.size()  > 0);
-               
-               checkIfOutputOverlapsWithTxMtd(outputs, oprnds, isCSV, isBB, 
csvoutputs, bboutputs, fs);
-               
-               JobReturn ret = null;
-               
-               if (!oprnds.isApply) {
-                       // build specification file with column IDs insteadof 
column names
-                       String specWithIDs = processSpecFile(fs, 
oprnds.inputPath, smallestFile, colNamesToIds, oprnds.inputCSVProperties, 
-                                       oprnds.spec);
-                       MapReduceTool.writeStringToHDFS(specWithIDs, 
oprnds.txMtdPath + "/" + "spec.json");
-       
-                       ret = performTransform(job, fs, oprnds.inputPath, 
colNamesToIds.size(), oprnds.inputCSVProperties, specWithIDs, 
-                                       oprnds.txMtdPath, oprnds.isApply, 
outputs[0], outHeader, isBB, isCSV );
-               }
-               else {
-                       // copy given transform metadata (applyTxPath) to 
specified location (txMtdPath)
-                       MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(oprnds.txMtdPath), job);
-                       MapReduceTool.copyFileOnHDFS(oprnds.applyTxPath, 
oprnds.txMtdPath);
-                       
-                       // path to specification file (optionally specified)
-                       String specWithIDs = (oprnds.spec != null) ? 
-                               oprnds.spec : 
MapReduceTool.readStringFromHDFSFile(oprnds.txMtdPath + "/" + "spec.json");
-                       
-                       ret = performTransform(job, fs, oprnds.inputPath, 
colNamesToIds.size(), oprnds.inputCSVProperties, specWithIDs,  
-                                       oprnds.txMtdPath, oprnds.isApply, 
outputs[0], outHeader, isBB, isCSV );
-               }
-               
-               return ret;
-       }
-       
-       /**
-        * Helper function to fetch and sort the list of part files under the 
given
-        * input directory.
-        * 
-        * @param input input directory
-        * @param fs file system
-        * @return list of paths to input file parts
-        * @throws FileNotFoundException if FileNotFoundException occurs
-        * @throws IOException if IOException occurs
-        */
-       @SuppressWarnings("unchecked")
-       private static ArrayList<Path> collectInputFiles(String input, 
FileSystem fs) throws FileNotFoundException, IOException 
-       {
-               Path path = new Path(input);
-               ArrayList<Path> files=new ArrayList<Path>();
-               if(fs.isDirectory(path))
-               {
-                       for(FileStatus stat: fs.listStatus(path, 
CSVReblockMR.hiddenFileFilter))
-                               files.add(stat.getPath());
-                       Collections.sort(files);
-               }
-               else
-                       files.add(path);
-
-               return files;
-       }
-       
-       private static int[] countNumRows(ArrayList<Path> files, 
CSVFileFormatProperties prop, FileSystem fs, TfUtils agents) throws IOException 
-       {
-               int[] rows = new int[2];
-               int numRows=0, numRowsTf=0;
-               
-               OmitAgent oa = agents.getOmitAgent();
-               
-               if(!oa.isApplicable())
-               {
-                       for(int fileNo=0; fileNo<files.size(); fileNo++)
-                       {
-                               try(BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo))))) {
-                                       if(fileNo==0 && prop.hasHeader() ) 
-                                               br.readLine(); //ignore header
-                                       
-                                       while ( br.readLine() != null)
-                                               numRows++;
-                               }
-                       }
-                       numRowsTf = numRows;
-               }
-               else
-               {
-                       String line = null;
-                       String[] words;
-                       
-                       Pattern delim = 
Pattern.compile(Pattern.quote(prop.getDelim()));
-                       
-                       for(int fileNo=0; fileNo<files.size(); fileNo++)
-                       {
-                               try( BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo)))) ) {
-                                       if(fileNo==0 && prop.hasHeader() ) 
-                                               br.readLine(); //ignore header
-                                       while ( (line=br.readLine()) != null)
-                                       {
-                                               numRows++;
-                                               
-                                               words = delim.split(line, -1);
-                                               if(!oa.omit(words, agents))
-                                                       numRowsTf++;
-                                       }
-                               }
-                       }
-               }
-               
-               rows[0] = numRows;
-               rows[1] = numRowsTf;
-               
-               return rows;
-       }
-       
-       /**
-        * Main method to create and/or apply transformation metdata in-memory, 
on a single node.
-        * 
-        * @param job job configuration
-        * @param fs file system
-        * @param inputPath path to input files
-        * @param ncols number of columns
-        * @param prop csv file format properties
-        * @param specWithIDs JSON transform specification with IDs
-        * @param tfMtdPath transform metadata path
-        * @param isApply ?
-        * @param result output matrix
-        * @param headerLine header line
-        * @param isBB true if binary block
-        * @param isCSV true if CSV
-        * @return MR job result
-        * @throws IOException if IOException occurs
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws JSONException if JSONException occurs
-        */
-       private static JobReturn performTransform(JobConf job, FileSystem fs, 
String inputPath, int ncols, CSVFileFormatProperties prop, String specWithIDs, 
String tfMtdPath, boolean isApply, MatrixObject result, String headerLine, 
boolean isBB, boolean isCSV ) throws IOException, DMLRuntimeException, 
IllegalArgumentException, JSONException {
-               
-               String[] na = TfUtils.parseNAStrings(prop.getNAStrings());
-               
-               JSONObject spec = new JSONObject(specWithIDs);
-               TfUtils agents = new TfUtils(headerLine, prop.hasHeader(), 
prop.getDelim(), na, spec, ncols, tfMtdPath, null, null );
-               
-               MVImputeAgent _mia = agents.getMVImputeAgent();
-               RecodeAgent _ra = agents.getRecodeAgent();
-               BinAgent _ba = agents.getBinAgent();
-               DummycodeAgent _da = agents.getDummycodeAgent();
-
-               // List of files to read
-               ArrayList<Path> files = collectInputFiles(inputPath, fs);
-                               
-               // ---------------------------------
-               // Construct transformation metadata
-               // ---------------------------------
-               
-               String line = null;
-               String[] words  = null;
-               
-               int numColumnsTf=0;
-               
-               if (!isApply) {
-                       for(int fileNo=0; fileNo<files.size(); fileNo++)
-                       {
-                               try(BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo)))) ) {
-                                       if(fileNo==0 && prop.hasHeader() ) 
-                                               br.readLine(); //ignore header
-                                       
-                                       line = null;
-                                       while ( (line = br.readLine()) != null) 
{
-                                               agents.prepareTfMtd(line);
-                                       }
-                               }
-                       }
-                       
-                       if(agents.getValid() == 0) 
-                               throw new 
DMLRuntimeException(ERROR_MSG_ZERO_ROWS);
-                       
-                       _mia.outputTransformationMetadata(tfMtdPath, fs, 
agents);
-                       _ba.outputTransformationMetadata(tfMtdPath, fs, agents);
-                       _ra.outputTransformationMetadata(tfMtdPath, fs, agents);
-               
-                       // prepare agents for the subsequent phase of applying 
transformation metadata
-                       
-                       // NO need to loadTxMtd for _ra, since the maps are 
already present in the memory
-                       Path tmp = new Path(tfMtdPath);
-                       _mia.loadTxMtd(job, fs, tmp, agents);
-                       _ba.loadTxMtd(job, fs, tmp, agents);
-                       
-                       _da.setRecodeMapsCP( _ra.getCPRecodeMaps() );
-                       _da.setNumBins(_ba.getColList(), _ba.getNumBins());
-                       _da.loadTxMtd(job, fs, tmp, agents);
-               }
-               else {
-                       // Count the number of rows
-                       int rows[] = countNumRows(files, prop, fs, agents);
-                       agents.setTotal(rows[0]);
-                       agents.setValid(rows[1]);
-                       
-                       if(agents.getValid() == 0) 
-                               throw new DMLRuntimeException("Number of rows 
in the transformed output (potentially, after ommitting the ones with missing 
values) is zero. Cannot proceed.");
-                       
-                       // Load transformation metadata
-                       // prepare agents for the subsequent phase of applying 
transformation metadata
-                       Path tmp = new Path(tfMtdPath);
-                       _mia.loadTxMtd(job, fs, tmp, agents);
-                       _ra.loadTxMtd(job, fs, tmp, agents);
-                       _ba.loadTxMtd(job, fs, tmp, agents);
-                       
-                       _da.setRecodeMaps( _ra.getRecodeMaps() );
-                       _da.setNumBins(_ba.getColList(), _ba.getNumBins());
-                       _da.loadTxMtd(job, fs, tmp, agents);
-               }
-               
-               // -----------------------------
-               // Apply transformation metadata
-               // -----------------------------
-        
-               numColumnsTf = getNumColumnsTf(fs, headerLine, prop.getDelim(), 
tfMtdPath);
-
-               MapReduceTool.deleteFileIfExistOnHDFS(result.getFileName());
-               BufferedWriter out=new BufferedWriter(new 
OutputStreamWriter(fs.create(new Path(result.getFileName()),true)));          
-               StringBuilder sb = new StringBuilder();
-               
-               try {
-                       MatrixBlock mb = null; 
-                       if ( isBB ) 
-                       {
-                               int estNNZ = (int)agents.getValid() * ncols;
-                               mb = new MatrixBlock((int)agents.getValid(), 
numColumnsTf, estNNZ );
-                               
-                               if ( mb.isInSparseFormat() )
-                                       mb.allocateSparseRowsBlock();
-                               else
-                                       mb.allocateDenseBlock();
-                       }
-       
-                       int rowID = 0; // rowid to be used in filling the 
matrix block
-                       
-                       for(int fileNo=0; fileNo<files.size(); fileNo++)
-                       {
-                               try( BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo)))) ) {
-                                       if ( fileNo == 0 ) 
-                                       {
-                                               if ( prop.hasHeader() )
-                                                       br.readLine(); // 
ignore the header line from data file
-                                               
-                                               //TODO: fix hard-wired header 
propagation to meta data column names
-                                               
-                                               String dcdHeader = 
_da.constructDummycodedHeader(headerLine, agents.getDelim());
-                                               numColumnsTf = 
_da.genDcdMapsAndColTypes(fs, tfMtdPath, ncols, agents);
-                                               generateHeaderFiles(fs, 
tfMtdPath, headerLine, dcdHeader);
-                                       }
-                                       
-                                       line = null;
-                                       while ( (line = br.readLine()) != null) 
{
-                                               words = agents.getWords(line);
-               
-                                               if(!agents.omit(words))
-                                               {
-                                                       words = 
agents.apply(words);
-                       
-                                                       if (isCSV)
-                                                       {
-                                                               out.write( 
agents.checkAndPrepOutputString(words, sb) );
-                                                               out.write("\n");
-                                                       }
-                                                       
-                                                       if( isBB ) 
-                                                       {
-                                                               
agents.check(words);
-                                                               for(int c=0; 
c<words.length; c++)
-                                                               {
-                                                                       
if(words[c] == null || words[c].isEmpty())
-                                                                               
;
-                                                                       else 
-                                                                               
mb.appendValue(rowID, c, UtilFunctions.parseToDouble(words[c]));
-                                                               }
-                                                       }
-                                                       rowID++;
-                                               }
-                                       }
-                               }
-                       }
-                       
-                       if(mb != null)
-                       {
-                               mb.recomputeNonZeros();
-                               mb.examSparsity();
-                               
-                               result.acquireModify(mb);
-                               result.release();
-                               result.exportData();
-                       }
-               }
-               finally {
-                       IOUtilFunctions.closeSilently(out);
-               }
-               
-               MatrixCharacteristics mc = new 
MatrixCharacteristics(agents.getValid(), numColumnsTf, (int) 
result.getNumRowsPerBlock(), (int) result.getNumColumnsPerBlock());
-               JobReturn ret = new JobReturn(new MatrixCharacteristics[]{mc}, 
true);
-
-               return ret;
-       }
-       
-       public static void generateHeaderFiles(FileSystem fs, String txMtdDir, 
String origHeader, String newHeader) throws IOException {
-               // write out given header line
-               try( BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(
-                               fs.create(new Path(txMtdDir+"/" + 
TfUtils.TXMTD_COLNAMES),true)))) {
-                       br.write(origHeader+"\n");
-               }
-
-               // write out the new header line (after all transformations)
-               try( BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(
-                               fs.create(new Path(txMtdDir + "/" + 
TfUtils.TXMTD_DC_COLNAMES),true))) ){
-                       br.write(newHeader+"\n");
-               }
-       }
-       
-       private static void checkIfOutputOverlapsWithTxMtd(MatrixObject[] 
outputs, TransformOperands oprnds,
-                       boolean isCSV, boolean isBB, ArrayList<Integer> 
csvoutputs, ArrayList<Integer> bboutputs, FileSystem fs) throws 
DMLRuntimeException {
-               if(isCSV) {
-                       checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, 
outputs[csvoutputs.get(0)].getFileName(), fs);
-               }
-               else if(isBB) {
-                       checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, 
outputs[bboutputs.get(0)].getFileName(), fs);
-               }
-       }
-       
-       @SuppressWarnings("deprecation")
-       private static void checkIfOutputOverlapsWithTxMtd(String txMtdPath, 
String outputPath, FileSystem fs) 
-               throws DMLRuntimeException 
-       {
-               Path path1 = new Path(txMtdPath).makeQualified(fs);
-               Path path2 = new Path(outputPath).makeQualified(fs);
-               
-               String fullTxMtdPath = path1.toString();
-               String fullOutputPath = path2.toString();
-               
-               
if(path1.getParent().toString().equals(path2.getParent().toString())) {
-                       // Both txMtdPath and outputPath are in same folder, 
but outputPath can have suffix 
-                       if(fullTxMtdPath.equals(fullOutputPath)) {
-                               throw new DMLRuntimeException("The transform 
path \'" + txMtdPath 
-                                               + "\' cannot overlap with the 
output path \'" + outputPath + "\'");
-                       }
-               }
-               else if(fullTxMtdPath.startsWith(fullOutputPath) || 
fullOutputPath.startsWith(fullTxMtdPath)) {
-                       throw new DMLRuntimeException("The transform path \'" + 
txMtdPath 
-                                       + "\' cannot overlap with the output 
path \'" + outputPath + "\'");
-               }
-       }
-       
-       public static void spDataTransform(ParameterizedBuiltinSPInstruction 
inst, FrameObject[] inputs, MatrixObject[] outputs, ExecutionContext ec) throws 
Exception {
-               
-               SparkExecutionContext sec = (SparkExecutionContext)ec;
-               
-               // Parse transform instruction (the first instruction) to 
obtain relevant fields
-               TransformOperands oprnds = new 
TransformOperands(inst.getParams(), inputs[0]);
-               
-               JobConf job = new JobConf();
-               FileSystem fs = 
IOUtilFunctions.getFileSystem(inputs[0].getFileName());
-               
-               checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, 
outputs[0].getFileName(), fs);
-               
-               // find the first file in alphabetical ordering of partfiles in 
directory inputPath 
-               String smallestFile = CSVReblockMR.findSmallestFile(job, 
oprnds.inputPath);
-               
-               // find column names and construct output header
-               String headerLine = readHeaderLine(fs, 
oprnds.inputCSVProperties, smallestFile);
-               HashMap<String, Integer> colNamesToIds = processColumnNames(fs, 
oprnds.inputCSVProperties, headerLine, smallestFile);
-               int numColumns = colNamesToIds.size();
-               String outHeader = getOutputHeader(fs, headerLine, oprnds);
-               
-               String tmpPath = 
MRJobConfiguration.constructTempOutputFilename();
-               
-               // Construct RDD for input data
-               @SuppressWarnings("unchecked")
-               JavaPairRDD<LongWritable, Text> inputData = 
(JavaPairRDD<LongWritable, Text>) sec.getRDDHandleForFrameObject(inputs[0], 
InputInfo.CSVInputInfo);
-               JavaRDD<Tuple2<LongWritable,Text>> csvLines = 
JavaPairRDD.toRDD(inputData).toJavaRDD();
-               
-               long numRowsTf=0, numColumnsTf=0;
-               JavaPairRDD<Long, String> tfPairRDD = null;
-               
-               if (!oprnds.isApply) {
-                       // build specification file with column IDs insteadof 
column names
-                       String specWithIDs = processSpecFile(fs, 
oprnds.inputPath, smallestFile,
-                                               colNamesToIds, 
oprnds.inputCSVProperties, oprnds.spec);
-                       colNamesToIds = null; // enable GC on colNamesToIds
-
-                       // Build transformation metadata, including recode 
maps, bin definitions, etc.
-                       // Also, generate part offsets file (counters file), 
which is to be used in csv-reblock (if needed)
-                       String partOffsetsFile =  
MRJobConfiguration.constructTempOutputFilename();
-                       numRowsTf = GenTfMtdSPARK.runSparkJob(sec, csvLines, 
oprnds.txMtdPath,  
-                                                                               
                        specWithIDs,partOffsetsFile, 
-                                                                               
                        oprnds.inputCSVProperties, numColumns, 
-                                                                               
                        outHeader);
-                       
-                       // store the specFileWithIDs as transformation metadata
-                       MapReduceTool.writeStringToHDFS(specWithIDs, 
oprnds.txMtdPath + "/" + "spec.json");
-                       
-                       numColumnsTf = getNumColumnsTf(fs, outHeader, 
oprnds.inputCSVProperties.getDelim(), oprnds.txMtdPath);
-                       
-                       tfPairRDD = ApplyTfCSVSPARK.runSparkJob(sec, csvLines, 
oprnds.txMtdPath, 
-                                       specWithIDs, tmpPath, 
oprnds.inputCSVProperties, numColumns, outHeader);
-
-                       
-                       MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(partOffsetsFile), job);
-               }
-               else {
-                       colNamesToIds = null; // enable GC on colNamesToIds
-                       
-                       // copy given transform metadata (applyTxPath) to 
specified location (txMtdPath)
-                       MapReduceTool.deleteFileIfExistOnHDFS(new 
Path(oprnds.txMtdPath), job);
-                       MapReduceTool.copyFileOnHDFS(oprnds.applyTxPath, 
oprnds.txMtdPath);
-                       
-                       // path to specification file
-                       String specWithIDs = (oprnds.spec != null) ? 
oprnds.spec :
-                                       
MapReduceTool.readStringFromHDFSFile(oprnds.txMtdPath + "/" + "spec.json");
-                       numColumnsTf = getNumColumnsTf(fs, outHeader, 
-                                                                               
                oprnds.inputCSVProperties.getDelim(), 
-                                                                               
                oprnds.txMtdPath);
-                       
-                       // Apply transformation metadata, and perform actual 
transformation 
-                       tfPairRDD = ApplyTfCSVSPARK.runSparkJob(sec, csvLines, 
oprnds.txMtdPath, 
-                                       specWithIDs, tmpPath, 
oprnds.inputCSVProperties, numColumns, outHeader);
-                       
-               }
-               
-               // copy auxiliary data (old and new header lines) from 
temporary location to txMtdPath
-               moveFilesFromTmp(fs, tmpPath, oprnds.txMtdPath);
-
-               // convert to csv output format (serialized longwritable/text)
-               JavaPairRDD<LongWritable, Text> outtfPairRDD = 
-                               
RDDConverterUtils.stringToSerializableText(tfPairRDD);
-               
-               if ( outtfPairRDD != null ) 
-               {
-                       MatrixObject outMO = outputs[0];
-                       String outVar = outMO.getVarName();
-                       outMO.setRDDHandle(new RDDObject(outtfPairRDD, outVar));
-                       sec.addLineageRDD(outVar, 
inst.getParams().get("target"));
-                       
-                       //update output statistics (required for correctness)
-                       MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(outVar);
-                       mcOut.setDimension(numRowsTf, numColumnsTf);
-                       mcOut.setNonZeros(-1);
-               }
-       }
-       
-
-       /**
-        * Private class to hold the relevant input parameters to transform 
operation.
-        */
-       private static class TransformOperands 
-       {
-               private String inputPath=null;
-               private String txMtdPath=null;
-               private String applyTxPath=null;
-               private String spec=null;
-               private String outNamesFile=null;
-               private boolean isApply=false;
-               private CSVFileFormatProperties inputCSVProperties = null;
-               
-               private TransformOperands(String inst, CacheableData<?> input) {
-                       inputPath = input.getFileName();
-                       inputCSVProperties = 
(CSVFileFormatProperties)input.getFileFormatProperties();
-                       String[] instParts = 
inst.split(Instruction.OPERAND_DELIM);
-                       txMtdPath = instParts[3];
-                       applyTxPath = instParts[4].startsWith("applymtd=") ? 
instParts[4].substring(9) : null;
-                       isApply = (applyTxPath != null);
-                       int pos = (applyTxPath != null) ? 5 : 4;
-                       if( pos<instParts.length )
-                               spec = instParts[pos].startsWith("spec=") ? 
instParts[pos++].substring(5) : null;
-                       if( pos<instParts.length )
-                               outNamesFile = 
instParts[pos].startsWith("outnames=") ? instParts[pos].substring(9) : null;
-               }
-               
-               private TransformOperands(HashMap<String, String> params, 
CacheableData<?> input) {
-                       inputPath = input.getFileName();
-                       txMtdPath = 
params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_MTD);
-                       spec = 
params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_SPEC);
-                       applyTxPath = 
params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD);
-                       isApply = (applyTxPath != null);
-                       outNamesFile =  
params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_OUTNAMES); // can 
be null
-                       inputCSVProperties = 
(CSVFileFormatProperties)input.getFileFormatProperties();
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java 
b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
deleted file mode 100644
index c35e160..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DistinctValue implements Writable, Serializable {
-       
-       private static final long serialVersionUID = -8236705946336974836L;
-
-       private static final byte [] EMPTY_BYTES = new byte[0];
-         
-       // word (distinct value)
-       private byte[] _bytes;
-       private int _length;
-       // count
-       private long _count;
-       
-       public DistinctValue() {
-               _bytes = EMPTY_BYTES;
-               _length = 0;
-               _count = -1;
-       }
-       
-       public DistinctValue(String w, long count) throws 
CharacterCodingException {
-           ByteBuffer bb = Text.encode(w, true);
-           _bytes = bb.array();
-           _length = bb.limit();
-               _count = count;
-       }
-       
-       public DistinctValue(OffsetCount oc) throws CharacterCodingException {
-               this(oc.filename + "," + oc.fileOffset, oc.count);
-       }
-       
-       public void reset() {
-               _bytes = EMPTY_BYTES;
-               _length = 0;
-               _count = -1;
-       }
-       
-       public String getWord() {  
-               return new String( _bytes, 0, _length, Charset.forName("UTF-8") 
); 
-       }
-       
-       public long getCount() { 
-               return _count; 
-       }
-       
-       @Override
-       public void write(DataOutput out) throws IOException {
-           // write word
-               WritableUtils.writeVInt(out, _length);
-           out.write(_bytes, 0, _length);
-               // write count
-           out.writeLong(_count);
-       }
-       
-       @Override
-       public void readFields(DataInput in) throws IOException {
-           // read word 
-               _length = WritableUtils.readVInt(in);
-           _bytes = new byte[_length];
-           in.readFully(_bytes, 0, _length);
-           // read count
-           _count = in.readLong();
-       }
-       
-       public OffsetCount getOffsetCount() {
-               String[] parts = getWord().split(",");
-               return new OffsetCount( parts[0],
-                               UtilFunctions.parseToLong(parts[1]),
-                               getCount() );
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
deleted file mode 100644
index 676b31e..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.transform.encode.Encoder;
-import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
-import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-public class DummycodeAgent extends Encoder 
-{              
-       private static final long serialVersionUID = 5832130477659116489L;
-
-       private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
-       private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
-       private int[] _binList = null;
-       private int[] _numBins = null;
-       
-       private int[] _domainSizes = null;                      // length = #of 
dummycoded columns
-       private int[] _dcdColumnMap = null;                     // to help in 
translating between original and dummycoded column IDs
-       private long _dummycodedLength = 0;                     // #of columns 
after dummycoded
-
-       public DummycodeAgent(JSONObject parsedSpec, String[] colnames, int 
clen) throws JSONException {
-               super(null, clen);
-               
-               if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) {
-                       int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, 
colnames, TfUtils.TXMETHOD_DUMMYCODE);
-                       initColList(collist);
-               }
-       }
-       
-       @Override
-       public int getNumCols() {
-               return (int)_dummycodedLength;
-       }
-       
-       /**
-        * Method to output transformation metadata from the mappers. 
-        * This information is collected and merged by the reducers.
-        */
-       @Override
-       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               // There is no metadata required for dummycode.
-               // Required information is output from RecodeAgent.
-               return;
-       }
-       
-       @Override
-       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
-                       String outputDir, int colID, FileSystem fs, TfUtils 
agents) throws IOException {
-               // Nothing to do here
-       }
-
-       public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> 
maps) {
-               _finalMaps = maps;
-       }
-       
-       public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> 
maps) {
-               _finalMapsCP = maps;
-       }
-       
-       public void setNumBins(int[] binList, int[] numbins) {
-               _binList = binList;
-               _numBins = numbins;
-       }
-       
-       /**
-        * Method to generate dummyCodedMaps.csv, with the range of column IDs 
for each variable in the original data.
-        * 
-        * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, 
st, end]
-        *              1/0 indicates if ColID is dummycoded or not
-        *              [st,end] is the range of dummycoded column numbers for 
the given ColID
-        * 
-        * It also generates coltypes.csv, with the type (scale, nominal, etc.) 
of columns in the output.
-        * Recoded columns are of type nominal, binner columns are of type 
ordinal, dummycoded columns are of type 
-        * dummycoded, and the remaining are of type scale.
-        * 
-        * @param fs file system
-        * @param txMtdDir path to transform metadata directory
-        * @param numCols number of columns
-        * @param agents ?
-        * @return ?
-        * @throws IOException if IOException occurs
-        */
-       public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int 
numCols, TfUtils agents) throws IOException {
-               
-               // initialize all column types in the transformed data to SCALE
-               TfUtils.ColumnTypes[] ctypes = new TfUtils.ColumnTypes[(int) 
_dummycodedLength];
-               for(int i=0; i < _dummycodedLength; i++)
-                       ctypes[i] = TfUtils.ColumnTypes.SCALE;
-               
-               _dcdColumnMap = new int[numCols];
-
-               int sum=1;
-               try( BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(
-                               fs.create(new Path(txMtdDir+"/Dummycode/" + 
TfUtils.DCD_FILE_NAME),true))) ) {
-                       int idx = 0;
-                       for(int colID=1; colID <= numCols; colID++) 
-                       {
-                               if ( _colList != null && idx < _colList.length 
&& _colList[idx] == colID )
-                               {
-                                       br.write(colID + TfUtils.TXMTD_SEP + 
"1" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + (sum+_domainSizes[idx]-1) + 
"\n");
-                                       _dcdColumnMap[colID-1] = 
(sum+_domainSizes[idx]-1)-1;
-       
-                                       for(int i=sum; i 
<=(sum+_domainSizes[idx]-1); i++)
-                                               ctypes[i-1] = 
TfUtils.ColumnTypes.DUMMYCODED;
-                                       
-                                       sum += _domainSizes[idx];
-                                       idx++;
-                               }
-                               else 
-                               {
-                                       br.write(colID + TfUtils.TXMTD_SEP + 
"0" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + sum + "\n");
-                                       _dcdColumnMap[colID-1] = sum-1;
-                                       
-                                       if ( 
agents.getBinAgent().isApplicable(colID) != -1 )
-                                               ctypes[sum-1] = 
TfUtils.ColumnTypes.ORDINAL;    // binned variable results in an ordinal column
-                                       
-                                       if ( 
agents.getRecodeAgent().isApplicable(colID) != -1 )
-                                               ctypes[sum-1] = 
TfUtils.ColumnTypes.NOMINAL;
-                                       
-                                       sum += 1;
-                               }
-                       }
-               }
-
-               // Write coltypes.csv
-               try(BufferedWriter br=new BufferedWriter(new OutputStreamWriter(
-                               fs.create(new Path(txMtdDir + File.separator + 
TfUtils.TXMTD_COLTYPES),true))) ) {
-                       br.write(ctypes[0].toID() + "");
-                       for(int i = 1; i < _dummycodedLength; i++) 
-                               br.write( TfUtils.TXMTD_SEP + ctypes[i].toID() 
);
-               }
-               
-               return sum-1;
-       }
-       
-       /**
-        * Given a dummycoded column id, find the corresponding original column 
ID.
-        *  
-        * @param colID dummycoded column ID
-        * @return original column ID, -1 if not found
-        */
-       public int mapDcdColumnID(int colID) 
-       {
-               for(int i=0; i < _dcdColumnMap.length; i++)
-               {
-                       int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
-                       int end = _dcdColumnMap[i]+1;
-                       //System.out.println((i+1) + ": " + "[" + st + "," + 
end + "]");
-                       
-                       if ( colID >= st && colID <= end)
-                               return i+1;
-               }
-               return -1;
-       }
-       
-       public String constructDummycodedHeader(String header, Pattern delim) {
-               
-               if(_colList == null && _binList == null )
-                       // none of the columns are dummycoded, simply return 
the given header
-                       return header;
-               
-               String[] names = delim.split(header, -1);
-               List<String> newNames = null;
-               
-               StringBuilder sb = new StringBuilder();
-               
-               // Dummycoding can be performed on either on a recoded column 
or on a binned column
-               
-               // process recoded columns
-               if(_finalMapsCP != null && _colList != null) 
-               {
-                       for(int i=0; i <_colList.length; i++) 
-                       {
-                               int colID = _colList[i];
-                               HashMap<String,Long> map = 
_finalMapsCP.get(colID);
-                               String colName = 
UtilFunctions.unquote(names[colID-1]);
-                               
-                               if ( map != null  ) 
-                               {
-                                       // order map entries by their recodeID
-                                       List<Map.Entry<String, Long>> entryList 
= new ArrayList<Map.Entry<String, Long>>(map.entrySet());
-                                       Comparator<Map.Entry<String, Long>> 
comp = new Comparator<Map.Entry<String, Long>>() {
-                                               @Override
-                                               public int 
compare(Entry<String, Long> entry1, Entry<String, Long> entry2) {
-                                                       Long value1 = 
entry1.getValue();
-                                                       Long value2 = 
entry2.getValue();
-                                                       return (int) (value1 - 
value2);
-                                               }
-                                       };
-                                       Collections.sort(entryList, comp);
-                                       newNames = new ArrayList<String>();
-                                       for (Entry<String, Long> entry : 
entryList) {
-                                               newNames.add(entry.getKey());
-                                       }
-                                       
-                                       // construct concatenated string of map 
entries
-                                       sb.setLength(0);
-                                       for(int idx=0; idx < newNames.size(); 
idx++) 
-                                       {
-                                               if(idx==0) 
-                                                       sb.append( colName + 
TfUtils.DCD_NAME_SEP + newNames.get(idx));
-                                               else
-                                                       sb.append( delim + 
colName + TfUtils.DCD_NAME_SEP + newNames.get(idx));
-                                       }
-                                       names[colID-1] = sb.toString();         
        // replace original column name with dcd name
-                               }
-                       }
-               }
-               else if(_finalMaps != null && _colList != null) {
-                       for(int i=0; i <_colList.length; i++) {
-                               int colID = _colList[i];
-                               HashMap<String,String> map = 
_finalMaps.get(colID);
-                               String colName = 
UtilFunctions.unquote(names[colID-1]);
-                               
-                               if ( map != null ) 
-                               {
-                                       
-                                       // order map entries by their recodeID 
(represented as Strings .. "1", "2", etc.)
-                                       List<Map.Entry<String, String>> 
entryList = new ArrayList<Map.Entry<String, String>>(map.entrySet());
-                                       Comparator<Map.Entry<String, String>> 
comp = new Comparator<Map.Entry<String, String>>() {
-                                               @Override
-                                               public int 
compare(Entry<String, String> entry1, Entry<String, String> entry2) {
-                                                       String value1 = 
entry1.getValue();
-                                                       String value2 = 
entry2.getValue();
-                                                       return 
(Integer.parseInt(value1) - Integer.parseInt(value2));
-                                               }
-                                       };
-                                       Collections.sort(entryList, comp);
-                                       newNames = new ArrayList<String>();
-                                       for (Entry<String, String> entry : 
entryList) {
-                                               newNames.add(entry.getKey());
-                                       }
-                                       
-                                       // construct concatenated string of map 
entries
-                                       sb.setLength(0);
-                                       for(int idx=0; idx < newNames.size(); 
idx++) 
-                                       {
-                                               if(idx==0) 
-                                                       sb.append( colName + 
TfUtils.DCD_NAME_SEP + newNames.get(idx));
-                                               else
-                                                       sb.append( delim + 
colName + TfUtils.DCD_NAME_SEP + newNames.get(idx));
-                                       }
-                                       names[colID-1] = sb.toString();         
        // replace original column name with dcd name
-                               }
-                       }
-               }
-               
-               // process binned columns
-               if (_binList != null) 
-                       for(int i=0; i < _binList.length; i++) 
-                       {
-                               int colID = _binList[i];
-                               
-                               // need to consider only binned and dummycoded 
columns
-                               if(isApplicable(colID) == -1)
-                                       continue;
-                               
-                               int numBins = _numBins[i];
-                               String colName = 
UtilFunctions.unquote(names[colID-1]);
-                               
-                               sb.setLength(0);
-                               for(int idx=0; idx < numBins; idx++) 
-                                       if(idx==0) 
-                                               sb.append( colName + 
TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) );
-                                       else
-                                               sb.append( delim + colName + 
TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) );
-                               names[colID-1] = sb.toString();                 
// replace original column name with dcd name
-                       }
-               
-               // Construct the full header
-               sb.setLength(0);
-               for(int colID=0; colID < names.length; colID++) 
-               {
-                       if (colID == 0)
-                               sb.append(names[colID]);
-                       else
-                               sb.append(delim + names[colID]);
-               }
-               //System.out.println("DummycodedHeader: " + sb.toString());
-               
-               return sb.toString();
-       }
-       
-       @Override
-       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if ( !isApplicable() ) {
-                       _dummycodedLength = _clen;
-                       return;
-               }
-               
-               // sort to-be dummycoded column IDs in ascending order. This is 
the order in which the new dummycoded record is constructed in apply() function.
-               Arrays.sort(_colList);  
-               _domainSizes = new int[_colList.length];
-
-               _dummycodedLength = _clen;
-               
-               //HashMap<String, String> map = null;
-               for(int i=0; i<_colList.length; i++) {
-                       int colID = _colList[i];
-                       
-                       // Find the domain size for colID using _finalMaps or 
_finalMapsCP
-                       int domainSize = 0;
-                       if(_finalMaps != null) {
-                               if(_finalMaps.get(colID) != null)
-                                       domainSize = 
_finalMaps.get(colID).size();
-                       }
-                       else {
-                               if(_finalMapsCP.get(colID) != null)
-                                       domainSize = 
_finalMapsCP.get(colID).size();
-                       }
-                       
-                       if ( domainSize != 0 ) {
-                               // dummycoded column
-                               _domainSizes[i] = domainSize;
-                       }
-                       else {
-                               // binned column
-                               if ( _binList != null )
-                               for(int j=0; j<_binList.length; j++) {
-                                       if (colID == _binList[j]) {
-                                               _domainSizes[i] = _numBins[j];
-                                               break;
-                                       }
-                               }
-                       }
-                       _dummycodedLength += _domainSizes[i]-1;
-               }
-       }
-
-
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               return apply(in, out);
-       }
-
-       @Override
-       public void build(FrameBlock in) {
-               //do nothing
-       }
-       
-       /**
-        * Method to apply transformations.
-        * 
-        * @param words array of strings
-        * @return array of transformed strings
-        */
-       @Override
-       public String[] apply(String[] words) 
-       {
-               if( !isApplicable() )
-                       return words;
-               
-               String[] nwords = new String[(int)_dummycodedLength];
-               int rcdVal = 0;
-               
-               for(int colID=1, idx=0, ncolID=1; colID <= words.length; 
colID++) {
-                       if(idx < _colList.length && colID==_colList[idx]) {
-                               // dummycoded columns
-                               try {
-                                       rcdVal = 
UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
-                                       nwords[ ncolID-1+rcdVal-1 ] = "1";
-                                       ncolID += _domainSizes[idx];
-                                       idx++;
-                               } 
-                               catch (Exception e) {
-                                       throw new RuntimeException("Error in 
dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] 
-                                                       + ", domainSize=" + 
_domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
-                               }
-                       }
-                       else {
-                               nwords[ncolID-1] = words[colID-1];
-                               ncolID++;
-                       }
-               }
-               
-               return nwords;
-       }
-       
-       @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out) 
-       {
-               MatrixBlock ret = new MatrixBlock(out.getNumRows(), 
(int)_dummycodedLength, false);
-               
-               for( int i=0; i<out.getNumRows(); i++ ) {
-                       for(int colID=1, idx=0, ncolID=1; colID <= 
out.getNumColumns(); colID++) {
-                               double val = out.quickGetValue(i, colID-1);
-                               if(idx < _colList.length && 
colID==_colList[idx]) {
-                                       ret.quickSetValue(i, 
ncolID-1+(int)val-1, 1);
-                                       ncolID += _domainSizes[idx];
-                                       idx++;
-                               }
-                               else {
-                                       double ptval = 
UtilFunctions.objectToDouble(in.getSchema()[colID-1], in.get(i, colID-1));
-                                       ret.quickSetValue(i, ncolID-1, ptval);
-                                       ncolID++;
-                               }
-                       }
-               }
-               
-               return ret;
-       }
-
-       @Override
-       public FrameBlock getMetaData(FrameBlock out) {
-               return out;
-       }
-       
-       @Override
-       public void initMetaData(FrameBlock meta) {
-               //initialize domain sizes and output num columns
-               _domainSizes = new int[_colList.length];
-               _dummycodedLength = _clen;
-               for( int j=0; j<_colList.length; j++ ) {
-                       int colID = _colList[j]; //1-based
-                       _domainSizes[j] = 
(int)meta.getColumnMetadata()[colID-1].getNumDistinct();
-                       _dummycodedLength +=  _domainSizes[j]-1;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
deleted file mode 100644
index 1e45036..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-
-
-public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, 
DistinctValue>{
-       
-       private OutputCollector<IntWritable, DistinctValue> _collector = null; 
-       private int _mapTaskID = -1;
-       
-       TfUtils _agents = null;
-
-       private boolean _partFileWithHeader = false;
-       private boolean _firstRecordInSplit = true;
-       private String _partFileName = null;
-       private long _offsetInPartFile = -1;
-       
-       // 
----------------------------------------------------------------------------------------------
-       
-       /**
-        * Configure the information used in the mapper, and setup 
transformation agents.
-        */
-       @Override
-       public void configure(JobConf job) {
-               String[] parts = 
job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID).split("_");
-               if ( parts[0].equalsIgnoreCase("task")) {
-                       _mapTaskID = Integer.parseInt(parts[parts.length-1]);
-               }
-               else if ( parts[0].equalsIgnoreCase("attempt")) {
-                       _mapTaskID = Integer.parseInt(parts[parts.length-2]);
-               }
-               else {
-                       throw new RuntimeException("Unrecognized format for 
taskID: " + job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID));
-               }
-
-               try {
-                       _partFileName = TfUtils.getPartFileName(job);
-                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-                       _agents = new TfUtils(job);
-               } catch(IOException e) { throw new RuntimeException(e); }
-                 catch(JSONException e)  { throw new RuntimeException(e); }
-
-       }
-       
-       
-       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws 
IOException  {
-               
-               if(_firstRecordInSplit)
-               {
-                       _firstRecordInSplit = false;
-                       _collector = out;
-                       _offsetInPartFile = rawKey.get();
-               }
-               
-               // ignore header
-               if (_agents.hasHeader() && rawKey.get() == 0 && 
_partFileWithHeader)
-                       return;
-               
-               _agents.prepareTfMtd(rawValue.toString());
-       }
-
-       @Override
-       public void close() throws IOException {
-               
_agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, 
_mapTaskID, _agents);
-               
_agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, 
_mapTaskID, _agents);
-               
_agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, 
_agents);
-               
-               // Output part-file offsets to create OFFSETS_FILE, which is to 
be used in csv reblocking.
-               // OffsetCount is denoted as a DistinctValue by concatenating 
parfile name and offset within partfile.
-               if(_collector != null) {
-                       IntWritable key = new 
IntWritable((int)_agents.getNumCols()+1);
-                       DistinctValue val = new DistinctValue(new 
OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())); 
-                       _collector.collect(key, val);
-               }
-               
-               // reset global variables, required when the jvm is reused.
-               _firstRecordInSplit = true;
-               _offsetInPartFile = -1;
-               _partFileWithHeader = false;
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java 
b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
deleted file mode 100644
index 01fc784..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, 
Text, LongWritable> {
-       
-       private JobConf _rJob = null;
-       TfUtils _agents = null;
-       
-       @Override
-       public void configure(JobConf job) {
-               _rJob = job;
-               
-               try {
-                       String outputDir = 
MRJobConfiguration.getOutputs(job)[0];
-                       _agents = new TfUtils(job, outputDir);
-               } 
-               catch(IOException e)  { throw new RuntimeException(e); }
-               catch(JSONException e)  { throw new RuntimeException(e); }
-       }
-
-       @Override
-       public void close() throws IOException {
-       }
-       
-       @Override
-       public void reduce(IntWritable key, Iterator<DistinctValue> values,
-                       OutputCollector<Text, LongWritable> output, Reporter 
reporter)
-                       throws IOException {
-               
-               FileSystem fs = FileSystem.get(_rJob);
-               
-               int colID = key.get();
-               
-               if(colID < 0) 
-               {
-                       // process mapper output for MV and Bin agents
-                       colID = colID*-1;
-                       
_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, 
_agents.getTfMtdDir(), colID, fs, _agents);
-               }
-               else if ( colID == _agents.getNumCols() + 1)
-               {
-                       // process mapper output for OFFSET_FILE
-                       ArrayList<OffsetCount> list = new 
ArrayList<OffsetCount>();
-                       while(values.hasNext())
-                               list.add(new 
OffsetCount(values.next().getOffsetCount()));
-                       
-                       long numTfRows = generateOffsetsFile(list);
-                       
reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS,
 numTfRows);
-
-               }
-               else 
-               {
-                       // process mapper output for Recode agent
-                       
_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, 
_agents.getTfMtdDir(), colID, fs, _agents);
-               }
-               
-       }
-       
-       @SuppressWarnings({"unchecked","deprecation"})
-       private long generateOffsetsFile(ArrayList<OffsetCount> list) throws 
IllegalArgumentException, IOException 
-       {
-               Collections.sort(list);
-               
-               SequenceFile.Writer writer = null;
-               long lineOffset=0;
-               try {
-                       writer = new SequenceFile.Writer(
-                               FileSystem.get(_rJob), _rJob, 
-                               new 
Path(_agents.getOffsetFile()+"/part-00000"), 
-                               ByteWritable.class, OffsetCount.class);
-               
-                       for(OffsetCount oc: list)
-                       {
-                               long count=oc.count;
-                               oc.count=lineOffset;
-                               writer.append(new ByteWritable((byte)0), oc);
-                               lineOffset+=count;
-                       }
-               }
-               finally {
-                       IOUtilFunctions.closeSilently(writer);
-               }
-               list.clear();
-               return lineOffset;
-       }
-       
-}
-

Reply via email to