http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
index 0c0d399..7fb1ccc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
@@ -1,112 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-
-public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, 
NullWritable, Text> {
-       
-       boolean _firstRecordInSplit = true;
-       boolean _partFileWithHeader = false;
-       
-       TfUtils tfmapper = null;
-       Reporter _reporter = null;
-       BufferedWriter br = null;
-       JobConf _rJob = null;
-       
-       @Override
-       public void configure(JobConf job) {
-               try {
-                       _rJob = job;
-                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-                       tfmapper = new TfUtils(job);
-                       
-                       tfmapper.loadTfMetadata(job, true);
-                       
-               } catch (IOException e) { throw new RuntimeException(e); }
-               catch(JSONException e)  { throw new RuntimeException(e); }
-
-       }
-       
-       @Override
-       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException  
{
-               
-               if(_firstRecordInSplit)
-               {
-                       _firstRecordInSplit = false;
-                       _reporter = reporter;
-                       
-                       // generate custom output paths so that order of rows 
in the 
-                       // output (across part files) matches w/ that from 
input data set
-                       String partFileSuffix = tfmapper.getPartFileID(_rJob, 
rawKey.get());
-                       Path mapOutputPath = new Path(tfmapper.getOutputPath() 
+ "/transform-part-" + partFileSuffix);
-                       
-                       // setup the writer for mapper's output
-                       // the default part-..... files will be deleted later 
once the job finishes 
-                       br = new BufferedWriter(new 
OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
-               }
-               
-               // output the header line
-               if ( rawKey.get() == 0 && _partFileWithHeader ) 
-               {
-                       _reporter = reporter;
-                       tfmapper.processHeaderLine();
-                       if ( tfmapper.hasHeader() )
-                               return;
-               }
-               
-               // parse the input line and apply transformation
-               String[] words = tfmapper.getWords(rawValue);
-               
-               if(!tfmapper.omit(words))
-               {
-                       try {
-                               words = tfmapper.apply(words);
-                               String outStr = 
tfmapper.checkAndPrepOutputString(words);
-                               //out.collect(NullWritable.get(), new 
Text(outStr));
-                               br.write(outStr + "\n");
-                       } 
-                       catch(DMLRuntimeException e) {
-                               throw new RuntimeException(e.getMessage() + ": 
" + rawValue.toString());
-                       }
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               if ( br != null ) 
-                       br.close();
-       }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+
+public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, 
NullWritable, Text> {
+       
+       boolean _firstRecordInSplit = true;
+       boolean _partFileWithHeader = false;
+       
+       TfUtils tfmapper = null;
+       Reporter _reporter = null;
+       BufferedWriter br = null;
+       JobConf _rJob = null;
+       
+       @Override
+       public void configure(JobConf job) {
+               try {
+                       _rJob = job;
+                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+                       tfmapper = new TfUtils(job);
+                       
+                       tfmapper.loadTfMetadata(job, true);
+                       
+               } catch (IOException e) { throw new RuntimeException(e); }
+               catch(JSONException e)  { throw new RuntimeException(e); }
+
+       }
+       
+       @Override
+       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException  
{
+               
+               if(_firstRecordInSplit)
+               {
+                       _firstRecordInSplit = false;
+                       _reporter = reporter;
+                       
+                       // generate custom output paths so that order of rows 
in the 
+                       // output (across part files) matches w/ that from 
input data set
+                       String partFileSuffix = tfmapper.getPartFileID(_rJob, 
rawKey.get());
+                       Path mapOutputPath = new Path(tfmapper.getOutputPath() 
+ "/transform-part-" + partFileSuffix);
+                       
+                       // setup the writer for mapper's output
+                       // the default part-..... files will be deleted later 
once the job finishes 
+                       br = new BufferedWriter(new 
OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
+               }
+               
+               // output the header line
+               if ( rawKey.get() == 0 && _partFileWithHeader ) 
+               {
+                       _reporter = reporter;
+                       tfmapper.processHeaderLine();
+                       if ( tfmapper.hasHeader() )
+                               return;
+               }
+               
+               // parse the input line and apply transformation
+               String[] words = tfmapper.getWords(rawValue);
+               
+               if(!tfmapper.omit(words))
+               {
+                       try {
+                               words = tfmapper.apply(words);
+                               String outStr = 
tfmapper.checkAndPrepOutputString(words);
+                               //out.collect(NullWritable.get(), new 
Text(outStr));
+                               br.write(outStr + "\n");
+                       } 
+                       catch(DMLRuntimeException e) {
+                               throw new RuntimeException(e.getMessage() + ": 
" + rawValue.toString());
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               if ( br != null ) 
+                       br.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
index 693d687..061f2e3 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
@@ -1,160 +1,160 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-
-public class ApplyTfCSVSPARK {
-       
-       /**
-        * Apply transformation metadata and generate the result in CSV format, 
as a
-        * JavaRDD of Strings.
-        */
-
-       public static JavaPairRDD<Long, String> runSparkJob(
-                       SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, 
Text>> inputRDD, 
-                       String tfMtdPath, String specFile, 
-                       String tmpPath, CSVFileFormatProperties prop, 
-                       int numCols, String headerLine
-               ) throws IOException, ClassNotFoundException, 
InterruptedException, IllegalArgumentException, JSONException {
-
-               // Load transformation metadata and broadcast it
-               JobConf job = new JobConf();
-               FileSystem fs = FileSystem.get(job);
-               
-               String[] naStrings = 
TfUtils.parseNAStrings(prop.getNAStrings());
-               JSONObject spec = TfUtils.readSpec(fs, specFile);
-               TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), 
prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath);
-               
-               _tfmapper.loadTfMetadata();
-
-               Broadcast<TfUtils> bcast_tf = 
sec.getSparkContext().broadcast(_tfmapper);
-               
-               /*
-                * Construct transformation metadata (map-side) -- the logic is 
similar
-                * to GTFMTDMapper
-                * 
-                * Note: The result of mapPartitionsWithIndex is cached so that 
the
-                * transformed data is not redundantly computed multiple times
-                */
-               JavaPairRDD<Long, String> applyRDD = inputRDD
-                               .mapPartitionsWithIndex( new 
ApplyTfCSVMap(bcast_tf),  true)
-                               .mapToPair(
-                                               new 
PairFunction<String,Long,String>(){
-                                                       private static final 
long serialVersionUID = 3868143093999082931L;
-                                                       @Override
-                                                       public Tuple2<Long, 
String> call(String t) throws Exception {
-                                                               return new 
Tuple2<Long, String>(new Long(1), t);
-                                                       }
-                                               }
-                               ).cache();
-
-               /*
-                * An action to force execution of apply()
-                * 
-                * We need to trigger the execution of this RDD so as to ensure 
the
-                * creation of a few metadata files (headers, dummycoded 
information,
-                * etc.), which are referenced in the caller function.
-                */
-               applyRDD.count();
-               
-               return applyRDD;
-       }
-
-       public static class ApplyTfCSVMap implements Function2<Integer, 
Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
-
-               private static final long serialVersionUID = 
1496686437276906911L;
-
-               TfUtils _tfmapper = null;
-               
-               ApplyTfCSVMap(boolean hasHeader, String delim, String 
naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, 
String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, 
IOException, JSONException {
-                       _tfmapper = tf.getValue();
-               }
-               
-               ApplyTfCSVMap(Broadcast<TfUtils> tf) throws 
IllegalArgumentException, IOException, JSONException {
-                       _tfmapper = tf.getValue();
-               }
-               
-               @Override
-               public Iterator<String> call(Integer partitionID,
-                               Iterator<Tuple2<LongWritable, Text>> csvLines) 
throws Exception {
-                       
-                       boolean first = true;
-                       Tuple2<LongWritable, Text> rec = null;
-                       ArrayList<String> outLines = new ArrayList<String>();
-                       
-                       while(csvLines.hasNext()) {
-                               rec = csvLines.next();
-                               
-                               if (first && partitionID == 0) {
-                                       first = false;
-                                       
-                                       _tfmapper.processHeaderLine();
-                                       
-                                       if (_tfmapper.hasHeader() ) {
-                                               //outLines.add(dcdHeader); // 
if the header needs to be preserved in the output file
-                                               continue; 
-                                       }
-                               }
-                               
-                               // parse the input line and apply transformation
-                       
-                               String[] words = _tfmapper.getWords(rec._2());
-                               
-                               if(!_tfmapper.omit(words))
-                               {
-                                       try {
-                                               words = _tfmapper.apply(words);
-                                               String outStr = 
_tfmapper.checkAndPrepOutputString(words);
-                                               outLines.add(outStr);
-                                       } 
-                                       catch(DMLRuntimeException e) {
-                                               throw new 
RuntimeException(e.getMessage() + ": " + rec._2().toString());
-                                       }
-                               }
-                       }
-                       
-                       return outLines.iterator();
-               }
-               
-       }
-
-       
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+
+
+public class ApplyTfCSVSPARK {
+       
+       /**
+        * Apply transformation metadata and generate the result in CSV format, 
as a
+        * JavaRDD of Strings.
+        */
+
+       public static JavaPairRDD<Long, String> runSparkJob(
+                       SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, 
Text>> inputRDD, 
+                       String tfMtdPath, String specFile, 
+                       String tmpPath, CSVFileFormatProperties prop, 
+                       int numCols, String headerLine
+               ) throws IOException, ClassNotFoundException, 
InterruptedException, IllegalArgumentException, JSONException {
+
+               // Load transformation metadata and broadcast it
+               JobConf job = new JobConf();
+               FileSystem fs = FileSystem.get(job);
+               
+               String[] naStrings = 
TfUtils.parseNAStrings(prop.getNAStrings());
+               JSONObject spec = TfUtils.readSpec(fs, specFile);
+               TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), 
prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath);
+               
+               _tfmapper.loadTfMetadata();
+
+               Broadcast<TfUtils> bcast_tf = 
sec.getSparkContext().broadcast(_tfmapper);
+               
+               /*
+                * Construct transformation metadata (map-side) -- the logic is 
similar
+                * to GTFMTDMapper
+                * 
+                * Note: The result of mapPartitionsWithIndex is cached so that 
the
+                * transformed data is not redundantly computed multiple times
+                */
+               JavaPairRDD<Long, String> applyRDD = inputRDD
+                               .mapPartitionsWithIndex( new 
ApplyTfCSVMap(bcast_tf),  true)
+                               .mapToPair(
+                                               new 
PairFunction<String,Long,String>(){
+                                                       private static final 
long serialVersionUID = 3868143093999082931L;
+                                                       @Override
+                                                       public Tuple2<Long, 
String> call(String t) throws Exception {
+                                                               return new 
Tuple2<Long, String>(new Long(1), t);
+                                                       }
+                                               }
+                               ).cache();
+
+               /*
+                * An action to force execution of apply()
+                * 
+                * We need to trigger the execution of this RDD so as to ensure 
the
+                * creation of a few metadata files (headers, dummycoded 
information,
+                * etc.), which are referenced in the caller function.
+                */
+               applyRDD.count();
+               
+               return applyRDD;
+       }
+
+       public static class ApplyTfCSVMap implements Function2<Integer, 
Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
+
+               private static final long serialVersionUID = 
1496686437276906911L;
+
+               TfUtils _tfmapper = null;
+               
+               ApplyTfCSVMap(boolean hasHeader, String delim, String 
naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, 
String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, 
IOException, JSONException {
+                       _tfmapper = tf.getValue();
+               }
+               
+               ApplyTfCSVMap(Broadcast<TfUtils> tf) throws 
IllegalArgumentException, IOException, JSONException {
+                       _tfmapper = tf.getValue();
+               }
+               
+               @Override
+               public Iterator<String> call(Integer partitionID,
+                               Iterator<Tuple2<LongWritable, Text>> csvLines) 
throws Exception {
+                       
+                       boolean first = true;
+                       Tuple2<LongWritable, Text> rec = null;
+                       ArrayList<String> outLines = new ArrayList<String>();
+                       
+                       while(csvLines.hasNext()) {
+                               rec = csvLines.next();
+                               
+                               if (first && partitionID == 0) {
+                                       first = false;
+                                       
+                                       _tfmapper.processHeaderLine();
+                                       
+                                       if (_tfmapper.hasHeader() ) {
+                                               //outLines.add(dcdHeader); // 
if the header needs to be preserved in the output file
+                                               continue; 
+                                       }
+                               }
+                               
+                               // parse the input line and apply transformation
+                       
+                               String[] words = _tfmapper.getWords(rec._2());
+                               
+                               if(!_tfmapper.omit(words))
+                               {
+                                       try {
+                                               words = _tfmapper.apply(words);
+                                               String outStr = 
_tfmapper.checkAndPrepOutputString(words);
+                                               outLines.add(outStr);
+                                       } 
+                                       catch(DMLRuntimeException e) {
+                                               throw new 
RuntimeException(e.getMessage() + ": " + rec._2().toString());
+                                       }
+                               }
+                       }
+                       
+                       return outLines.iterator();
+               }
+               
+       }
+
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index f08c9ff..b61c781 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -1,355 +1,355 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class BinAgent extends TransformationAgent {
-       
-       private static final long serialVersionUID = 1917445005206076078L;
-
-       public static final String MIN_PREFIX = "min";
-       public static final String MAX_PREFIX = "max";
-       public static final String NBINS_PREFIX = "nbins";
-
-       private int[] _binList = null;
-       //private byte[] _binMethodList = null; // Not used, since only 
equi-width is supported for now. 
-       private int[] _numBins = null;
-
-       private double[] _min=null, _max=null;  // min and max among 
non-missing values
-
-       private double[] _binWidths = null;             // width of a bin for 
each attribute
-       
-       BinAgent() { }
-       
-       BinAgent(JSONObject parsedSpec) throws JSONException {
-               
-               if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
-                       return;
-               
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.BIN.toString());
-               
-               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-               //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
-               JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
-                       
-               assert(attrs.size() == nbins.size());
-                       
-               _binList = new int[attrs.size()];
-               _numBins = new int[attrs.size()];
-               for(int i=0; i < _binList.length; i++) {
-                       _binList[i] = UtilFunctions.toInt(attrs.get(i));
-                       _numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
-               }
-               
-               // initialize internal transformation metadata
-               _min = new double[_binList.length];
-               Arrays.fill(_min, Double.MAX_VALUE);
-               _max = new double[_binList.length];
-               Arrays.fill(_max, -Double.MAX_VALUE);
-               
-               _binWidths = new double[_binList.length];
-       }
-       
-       public void prepare(String[] words, TfUtils agents) {
-               if ( _binList == null )
-                       return;
-               
-               for(int i=0; i <_binList.length; i++) {
-                       int colID = _binList[i];
-                       
-                       String w = null;
-                       double d = 0;
-                               
-                       // equi-width
-                       w = UtilFunctions.unquote(words[colID-1].trim());
-                       if(!agents.isNA(w)) {
-                               d = UtilFunctions.parseToDouble(w);
-                               if(d < _min[i])
-                                       _min[i] = d;
-                               if(d > _max[i])
-                                       _max[i] = d;
-                       }
-               }
-       }
-       
-       private DistinctValue prepMinOutput(int idx) throws 
CharacterCodingException {
-               String s =  MIN_PREFIX + Double.toString(_min[idx]);
-               return  new DistinctValue(s, -1L);
-       }
-       
-       private DistinctValue prepMaxOutput(int idx) throws 
CharacterCodingException {
-               String s =  MAX_PREFIX + Double.toString(_max[idx]);
-               return  new DistinctValue(s, -1L);
-       }
-       
-       private DistinctValue prepNBinsOutput(int idx) throws 
CharacterCodingException {
-               String s =  NBINS_PREFIX + Double.toString(_numBins[idx]);
-               return  new DistinctValue(s, -1L);
-       }
-       
-       /**
-        * Method to output transformation metadata from the mappers. 
-        * This information is collected and merged by the reducers.
-        * 
-        * @param out
-        * @throws IOException
-        */
-       @Override
-       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               if ( _binList == null )
-                       return;
-               
-               try { 
-                       for(int i=0; i < _binList.length; i++) {
-                               int colID = _binList[i];
-                               IntWritable iw = new IntWritable(-colID);
-                               
-                               out.collect(iw,  prepMinOutput(i));
-                               out.collect(iw,  prepMaxOutput(i));
-                               out.collect(iw,  prepNBinsOutput(i));
-                       }
-               } catch(Exception e) {
-                       throw new IOException(e);
-               }
-       }
-       
-       public ArrayList<Tuple2<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
-               if ( _binList == null )
-                       return list;
-               
-               try { 
-                       for(int i=0; i < _binList.length; i++) {
-                               int colID = _binList[i];
-                               Integer iw = -colID;
-                               
-                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMinOutput(i)) );
-                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMaxOutput(i)) );
-                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepNBinsOutput(i)) );
-                       }
-               } catch(Exception e) {
-                       throw new IOException(e);
-               }
-               return list;
-       }
-
-       private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
-       {
-               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
BIN_FILE_SUFFIX);
-               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
-               br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP 
+ binwidth + TXMTD_SEP + nbins + "\n");
-               br.close();
-       }
-
-       /** 
-        * Method to merge map output transformation metadata.
-        * 
-        * @param values
-        * @return
-        * @throws IOException 
-        */
-       @Override
-       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-               double min = Double.MAX_VALUE;
-               double max = -Double.MAX_VALUE;
-               int nbins = 0;
-               
-               DistinctValue val = new DistinctValue();
-               String w = null;
-               double d;
-               while(values.hasNext()) {
-                       val.reset();
-                       val = values.next();
-                       w = val.getWord();
-                       
-                       if(w.startsWith(MIN_PREFIX)) {
-                               d = UtilFunctions.parseToDouble(w.substring( 
MIN_PREFIX.length() ));
-                               if ( d < min )
-                                       min = d;
-                       }
-                       else if(w.startsWith(MAX_PREFIX)) {
-                               d = UtilFunctions.parseToDouble(w.substring( 
MAX_PREFIX.length() ));
-                               if ( d > max )
-                                       max = d;
-                       }
-                       else if (w.startsWith(NBINS_PREFIX)) {
-                               nbins = (int) UtilFunctions.parseToLong( 
w.substring(NBINS_PREFIX.length() ) );
-                       }
-                       else
-                               throw new RuntimeException("MVImputeAgent: 
Invalid prefix while merging map output: " + w);
-               }
-               
-               // write merged metadata
-               double binwidth = (max-min)/nbins;
-               writeTfMtd(colID, Double.toString(min), Double.toString(max), 
Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
-       }
-       
-       
-       public void outputTransformationMetadata(String outputDir, FileSystem 
fs, TfUtils agents) throws IOException {
-               if(_binList == null)
-                       return;
-               
-               MVImputeAgent mvagent = agents.getMVImputeAgent();
-               for(int i=0; i < _binList.length; i++) {
-                       int colID = _binList[i];
-                       
-                       // If the column is imputed with a constant, then 
adjust min and max based the value of the constant.
-                       if ( mvagent.isImputed(colID) != -1 && 
mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
-                       {
-                               double cst = UtilFunctions.parseToDouble( 
mvagent.getReplacement(colID) );
-                               if ( cst < _min[i])
-                                       _min[i] = cst;
-                               if ( cst > _max[i])
-                                       _max[i] = cst;
-                       }
-                       
-                       double binwidth = (_max[i] - _min[i])/_numBins[i];
-                       writeTfMtd(colID, Double.toString(_min[i]), 
Double.toString(_max[i]), Double.toString(binwidth), 
Integer.toString(_numBins[i]), outputDir, fs, agents);
-               }
-       }
-       
-       // 
------------------------------------------------------------------------------------------------
-
-       public int[] getBinList() { return _binList; }
-       public int[] getNumBins() { return _numBins; }
-       public double[] getMin()  { return _min; }
-       public double[] getBinWidths() { return _binWidths; }
-       
-       /**
-        * Method to load transform metadata for all attributes
-        * 
-        * @param job
-        * @throws IOException
-        */
-       @Override
-       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if ( _binList == null )
-                       return;
-               
-               if(fs.isDirectory(txMtdDir)) {
-                       for(int i=0; i<_binList.length;i++) {
-                               int colID = _binList[i];
-                               
-                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + BIN_FILE_SUFFIX);
-                               TfUtils.checkValidInputFile(fs, path, true); 
-                                       
-                               BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
-                               // format: colID,min,max,nbins
-                               String[] fields = 
br.readLine().split(TXMTD_SEP);
-                               double min = 
UtilFunctions.parseToDouble(fields[1]);
-                               //double max = 
UtilFunctions.parseToDouble(fields[2]);
-                               double binwidth = 
UtilFunctions.parseToDouble(fields[3]);
-                               int nbins = UtilFunctions.parseToInt(fields[4]);
-                               
-                               _numBins[i] = nbins;
-                               _min[i] = min;
-                               _binWidths[i] = binwidth; // (max-min)/nbins;
-                               
-                               br.close();
-                       }
-               }
-               else {
-                       fs.close();
-                       throw new RuntimeException("Path to recode maps must be 
a directory: " + txMtdDir);
-               }
-       }
-       
-       /**
-        * Method to apply transformations.
-        * 
-        * @param words
-        * @return
-        */
-       @Override
-       public String[] apply(String[] words, TfUtils agents) {
-               if ( _binList == null )
-                       return words;
-       
-               for(int i=0; i < _binList.length; i++) {
-                       int colID = _binList[i];
-                       
-                       try {
-                       double val = 
UtilFunctions.parseToDouble(words[colID-1]);
-                       int binid = 1;
-                       double tmp = _min[i] + _binWidths[i];
-                       while(val > tmp && binid < _numBins[i]) {
-                               tmp += _binWidths[i];
-                               binid++;
-                       }
-                       words[colID-1] = Integer.toString(binid);
-                       } catch(NumberFormatException e)
-                       {
-                               throw new RuntimeException("Encountered \"" + 
words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric 
value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an 
appropriate imputation method.");
-                       }
-               }
-               
-               return words;
-       }
-       
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isBinned(int colID)
-       {
-               if(_binList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_binList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-
-
-       @Override
-       public void print() {
-               System.out.print("Binning List (Equi-width): \n    ");
-               for(int i : _binList) {
-                       System.out.print(i + " ");
-               }
-               System.out.print("\n    ");
-               for(int b : _numBins) {
-                       System.out.print(b + " ");
-               }
-               System.out.println();
-       }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.CharacterCodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class BinAgent extends TransformationAgent {
+       
+       private static final long serialVersionUID = 1917445005206076078L;
+
+       public static final String MIN_PREFIX = "min";
+       public static final String MAX_PREFIX = "max";
+       public static final String NBINS_PREFIX = "nbins";
+
+       private int[] _binList = null;
+       //private byte[] _binMethodList = null; // Not used, since only 
equi-width is supported for now. 
+       private int[] _numBins = null;
+
+       private double[] _min=null, _max=null;  // min and max among 
non-missing values
+
+       private double[] _binWidths = null;             // width of a bin for 
each attribute
+       
+       BinAgent() { }
+       
+       BinAgent(JSONObject parsedSpec) throws JSONException {
+               
+               if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) )
+                       return;
+               
+               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.BIN.toString());
+               
+               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+               //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD);
+               JSONArray nbins = (JSONArray) obj.get(JSON_NBINS);
+                       
+               assert(attrs.size() == nbins.size());
+                       
+               _binList = new int[attrs.size()];
+               _numBins = new int[attrs.size()];
+               for(int i=0; i < _binList.length; i++) {
+                       _binList[i] = UtilFunctions.toInt(attrs.get(i));
+                       _numBins[i] = UtilFunctions.toInt(nbins.get(i)); 
+               }
+               
+               // initialize internal transformation metadata
+               _min = new double[_binList.length];
+               Arrays.fill(_min, Double.MAX_VALUE);
+               _max = new double[_binList.length];
+               Arrays.fill(_max, -Double.MAX_VALUE);
+               
+               _binWidths = new double[_binList.length];
+       }
+       
+       public void prepare(String[] words, TfUtils agents) {
+               if ( _binList == null )
+                       return;
+               
+               for(int i=0; i <_binList.length; i++) {
+                       int colID = _binList[i];
+                       
+                       String w = null;
+                       double d = 0;
+                               
+                       // equi-width
+                       w = UtilFunctions.unquote(words[colID-1].trim());
+                       if(!agents.isNA(w)) {
+                               d = UtilFunctions.parseToDouble(w);
+                               if(d < _min[i])
+                                       _min[i] = d;
+                               if(d > _max[i])
+                                       _max[i] = d;
+                       }
+               }
+       }
+       
+       private DistinctValue prepMinOutput(int idx) throws 
CharacterCodingException {
+               String s =  MIN_PREFIX + Double.toString(_min[idx]);
+               return  new DistinctValue(s, -1L);
+       }
+       
+       private DistinctValue prepMaxOutput(int idx) throws 
CharacterCodingException {
+               String s =  MAX_PREFIX + Double.toString(_max[idx]);
+               return  new DistinctValue(s, -1L);
+       }
+       
+       private DistinctValue prepNBinsOutput(int idx) throws 
CharacterCodingException {
+               String s =  NBINS_PREFIX + Double.toString(_numBins[idx]);
+               return  new DistinctValue(s, -1L);
+       }
+       
+       /**
+        * Method to output transformation metadata from the mappers. 
+        * This information is collected and merged by the reducers.
+        * 
+        * @param out
+        * @throws IOException
+        */
+       @Override
+       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
+               if ( _binList == null )
+                       return;
+               
+               try { 
+                       for(int i=0; i < _binList.length; i++) {
+                               int colID = _binList[i];
+                               IntWritable iw = new IntWritable(-colID);
+                               
+                               out.collect(iw,  prepMinOutput(i));
+                               out.collect(iw,  prepMaxOutput(i));
+                               out.collect(iw,  prepNBinsOutput(i));
+                       }
+               } catch(Exception e) {
+                       throw new IOException(e);
+               }
+       }
+       
+       public ArrayList<Tuple2<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
+               if ( _binList == null )
+                       return list;
+               
+               try { 
+                       for(int i=0; i < _binList.length; i++) {
+                               int colID = _binList[i];
+                               Integer iw = -colID;
+                               
+                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMinOutput(i)) );
+                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMaxOutput(i)) );
+                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepNBinsOutput(i)) );
+                       }
+               } catch(Exception e) {
+                       throw new IOException(e);
+               }
+               return list;
+       }
+
+       private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
+       {
+               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
BIN_FILE_SUFFIX);
+               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
+               br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP 
+ binwidth + TXMTD_SEP + nbins + "\n");
+               br.close();
+       }
+
+       /** 
+        * Method to merge map output transformation metadata.
+        * 
+        * @param values
+        * @return
+        * @throws IOException 
+        */
+       @Override
+       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
+               double min = Double.MAX_VALUE;
+               double max = -Double.MAX_VALUE;
+               int nbins = 0;
+               
+               DistinctValue val = new DistinctValue();
+               String w = null;
+               double d;
+               while(values.hasNext()) {
+                       val.reset();
+                       val = values.next();
+                       w = val.getWord();
+                       
+                       if(w.startsWith(MIN_PREFIX)) {
+                               d = UtilFunctions.parseToDouble(w.substring( 
MIN_PREFIX.length() ));
+                               if ( d < min )
+                                       min = d;
+                       }
+                       else if(w.startsWith(MAX_PREFIX)) {
+                               d = UtilFunctions.parseToDouble(w.substring( 
MAX_PREFIX.length() ));
+                               if ( d > max )
+                                       max = d;
+                       }
+                       else if (w.startsWith(NBINS_PREFIX)) {
+                               nbins = (int) UtilFunctions.parseToLong( 
w.substring(NBINS_PREFIX.length() ) );
+                       }
+                       else
+                               throw new RuntimeException("MVImputeAgent: 
Invalid prefix while merging map output: " + w);
+               }
+               
+               // write merged metadata
+               double binwidth = (max-min)/nbins;
+               writeTfMtd(colID, Double.toString(min), Double.toString(max), 
Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
+       }
+       
+       
+       public void outputTransformationMetadata(String outputDir, FileSystem 
fs, TfUtils agents) throws IOException {
+               if(_binList == null)
+                       return;
+               
+               MVImputeAgent mvagent = agents.getMVImputeAgent();
+               for(int i=0; i < _binList.length; i++) {
+                       int colID = _binList[i];
+                       
+                       // If the column is imputed with a constant, then 
adjust min and max based the value of the constant.
+                       if ( mvagent.isImputed(colID) != -1 && 
mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
+                       {
+                               double cst = UtilFunctions.parseToDouble( 
mvagent.getReplacement(colID) );
+                               if ( cst < _min[i])
+                                       _min[i] = cst;
+                               if ( cst > _max[i])
+                                       _max[i] = cst;
+                       }
+                       
+                       double binwidth = (_max[i] - _min[i])/_numBins[i];
+                       writeTfMtd(colID, Double.toString(_min[i]), 
Double.toString(_max[i]), Double.toString(binwidth), 
Integer.toString(_numBins[i]), outputDir, fs, agents);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------------------------------
+
+       public int[] getBinList() { return _binList; }
+       public int[] getNumBins() { return _numBins; }
+       public double[] getMin()  { return _min; }
+       public double[] getBinWidths() { return _binWidths; }
+       
+       /**
+        * Method to load transform metadata for all attributes
+        * 
+        * @param job
+        * @throws IOException
+        */
+       @Override
+       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
+               if ( _binList == null )
+                       return;
+               
+               if(fs.isDirectory(txMtdDir)) {
+                       for(int i=0; i<_binList.length;i++) {
+                               int colID = _binList[i];
+                               
+                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + BIN_FILE_SUFFIX);
+                               TfUtils.checkValidInputFile(fs, path, true); 
+                                       
+                               BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
+                               // format: colID,min,max,nbins
+                               String[] fields = 
br.readLine().split(TXMTD_SEP);
+                               double min = 
UtilFunctions.parseToDouble(fields[1]);
+                               //double max = 
UtilFunctions.parseToDouble(fields[2]);
+                               double binwidth = 
UtilFunctions.parseToDouble(fields[3]);
+                               int nbins = UtilFunctions.parseToInt(fields[4]);
+                               
+                               _numBins[i] = nbins;
+                               _min[i] = min;
+                               _binWidths[i] = binwidth; // (max-min)/nbins;
+                               
+                               br.close();
+                       }
+               }
+               else {
+                       fs.close();
+                       throw new RuntimeException("Path to recode maps must be 
a directory: " + txMtdDir);
+               }
+       }
+       
+       /**
+        * Method to apply transformations.
+        * 
+        * @param words
+        * @return
+        */
+       @Override
+       public String[] apply(String[] words, TfUtils agents) {
+               if ( _binList == null )
+                       return words;
+       
+               for(int i=0; i < _binList.length; i++) {
+                       int colID = _binList[i];
+                       
+                       try {
+                       double val = 
UtilFunctions.parseToDouble(words[colID-1]);
+                       int binid = 1;
+                       double tmp = _min[i] + _binWidths[i];
+                       while(val > tmp && binid < _numBins[i]) {
+                               tmp += _binWidths[i];
+                               binid++;
+                       }
+                       words[colID-1] = Integer.toString(binid);
+                       } catch(NumberFormatException e)
+                       {
+                               throw new RuntimeException("Encountered \"" + 
words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric 
value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an 
appropriate imputation method.");
+                       }
+               }
+               
+               return words;
+       }
+       
+       /**
+        * Check if the given column ID is subjected to this transformation.
+        * 
+        */
+       public int isBinned(int colID)
+       {
+               if(_binList == null)
+                       return -1;
+               
+               int idx = Arrays.binarySearch(_binList, colID);
+               return ( idx >= 0 ? idx : -1);
+       }
+
+
+       @Override
+       public void print() {
+               System.out.print("Binning List (Equi-width): \n    ");
+               for(int i : _binList) {
+                       System.out.print(i + " ");
+               }
+               System.out.print("\n    ");
+               for(int b : _numBins) {
+                       System.out.print(b + " ");
+               }
+               System.out.println();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java 
b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
index d44a904..2e52657 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java
@@ -1,108 +1,108 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DistinctValue implements Writable, Serializable {
-       
-       private static final long serialVersionUID = -8236705946336974836L;
-
-       private static final byte [] EMPTY_BYTES = new byte[0];
-         
-       // word (distinct value)
-       private byte[] _bytes;
-       private int _length;
-       // count
-       private long _count;
-       
-       public DistinctValue() {
-               _bytes = EMPTY_BYTES;
-               _length = 0;
-               _count = -1;
-       }
-       
-       public DistinctValue(String w, long count) throws 
CharacterCodingException {
-           ByteBuffer bb = Text.encode(w, true);
-           _bytes = bb.array();
-           _length = bb.limit();
-               _count = count;
-       }
-       
-       public DistinctValue(OffsetCount oc) throws CharacterCodingException 
-       {
-               this(oc.filename + "," + oc.fileOffset, oc.count);
-       }
-       
-       public void reset() {
-               _bytes = EMPTY_BYTES;
-               _length = 0;
-               _count = -1;
-       }
-       
-       public String getWord() {  return new String( _bytes, 0, _length, 
Charset.forName("UTF-8") ); }
-       public long getCount() { return _count; }
-       
-       @Override
-       public void write(DataOutput out) throws IOException {
-           // write word
-               WritableUtils.writeVInt(out, _length);
-           out.write(_bytes, 0, _length);
-               // write count
-           out.writeLong(_count);
-       }
-       
-       @Override
-       public void readFields(DataInput in) throws IOException {
-           // read word 
-               int newLength = WritableUtils.readVInt(in);
-           _bytes = new byte[newLength];
-           in.readFully(_bytes, 0, newLength);
-           _length = newLength;
-           if (_length != _bytes.length)
-               System.out.println("ERROR in DistinctValue.readFields()");
-           // read count
-           _count = in.readLong();
-       }
-       
-       public OffsetCount getOffsetCount() {
-               OffsetCount oc = new OffsetCount();
-               String[] parts = getWord().split(",");
-               oc.filename = parts[0];
-               oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
-               oc.count = getCount();
-               
-               return oc;
-       }
-       
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class DistinctValue implements Writable, Serializable {
+       
+       private static final long serialVersionUID = -8236705946336974836L;
+
+       private static final byte [] EMPTY_BYTES = new byte[0];
+         
+       // word (distinct value)
+       private byte[] _bytes;
+       private int _length;
+       // count
+       private long _count;
+       
+       public DistinctValue() {
+               _bytes = EMPTY_BYTES;
+               _length = 0;
+               _count = -1;
+       }
+       
+       public DistinctValue(String w, long count) throws 
CharacterCodingException {
+           ByteBuffer bb = Text.encode(w, true);
+           _bytes = bb.array();
+           _length = bb.limit();
+               _count = count;
+       }
+       
+       public DistinctValue(OffsetCount oc) throws CharacterCodingException 
+       {
+               this(oc.filename + "," + oc.fileOffset, oc.count);
+       }
+       
+       public void reset() {
+               _bytes = EMPTY_BYTES;
+               _length = 0;
+               _count = -1;
+       }
+       
+       public String getWord() {  return new String( _bytes, 0, _length, 
Charset.forName("UTF-8") ); }
+       public long getCount() { return _count; }
+       
+       @Override
+       public void write(DataOutput out) throws IOException {
+           // write word
+               WritableUtils.writeVInt(out, _length);
+           out.write(_bytes, 0, _length);
+               // write count
+           out.writeLong(_count);
+       }
+       
+       @Override
+       public void readFields(DataInput in) throws IOException {
+           // read word 
+               int newLength = WritableUtils.readVInt(in);
+           _bytes = new byte[newLength];
+           in.readFully(_bytes, 0, newLength);
+           _length = newLength;
+           if (_length != _bytes.length)
+               System.out.println("ERROR in DistinctValue.readFields()");
+           // read count
+           _count = in.readLong();
+       }
+       
+       public OffsetCount getOffsetCount() {
+               OffsetCount oc = new OffsetCount();
+               String[] parts = getWord().split(",");
+               oc.filename = parts[0];
+               oc.fileOffset = UtilFunctions.parseToLong(parts[1]);
+               oc.count = getCount();
+               
+               return oc;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index a1c76ba..079ad58 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -1,426 +1,426 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import com.google.common.base.Functions;
-import com.google.common.collect.Ordering;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class DummycodeAgent extends TransformationAgent {      
-       
-       private static final long serialVersionUID = 5832130477659116489L;
-
-       private int[] _dcdList = null;
-       private long numCols = 0;
-       
-       private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
-       private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
-       private int[] _binList = null;
-       private int[] _numBins = null;
-       
-       private int[] _domainSizes = null;                      // length = #of 
dummycoded columns
-       private int[] _dcdColumnMap = null;                     // to help in 
translating between original and dummycoded column IDs
-       private long _dummycodedLength = 0;                     // #of columns 
after dummycoded
-       
-       DummycodeAgent(int[] list) {
-               _dcdList = list;
-       }
-       
-       DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
-               numCols = ncol;
-               
-               if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
-                       return;
-               
-               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
-               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
-                       
-               _dcdList = new int[attrs.size()];
-               for(int i=0; i < _dcdList.length; i++) 
-                       _dcdList[i] = UtilFunctions.toInt(attrs.get(i));
-       }
-       
-       public int[] dcdList() {
-               return _dcdList;
-       }
-       
-       /**
-        * Method to output transformation metadata from the mappers. 
-        * This information is collected and merged by the reducers.
-        * 
-        * @param out
-        * @throws IOException
-        * 
-        */
-       @Override
-       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               // There is no metadata required for dummycode.
-               // Required information is output from RecodeAgent.
-               return;
-       }
-       
-       @Override
-       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
-                       String outputDir, int colID, FileSystem fs, TfUtils 
agents) throws IOException {
-               // Nothing to do here
-       }
-
-       public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> 
maps) {
-               _finalMaps = maps;
-       }
-       
-       public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> 
maps) {
-               _finalMapsCP = maps;
-       }
-       
-       public void setNumBins(int[] binList, int[] numbins) {
-               _binList = binList;
-               _numBins = numbins;
-       }
-       
-       /**
-        * Method to generate dummyCodedMaps.csv, with the range of column IDs 
for each variable in the original data.
-        * 
-        * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, 
st, end]
-        *              1/0 indicates if ColID is dummycoded or not
-        *              [st,end] is the range of dummycoded column numbers for 
the given ColID
-        * 
-        * It also generates coltypes.csv, with the type (scale, nominal, etc.) 
of columns in the output.
-        * Recoded columns are of type nominal, binner columns are of type 
ordinal, dummycoded columns are of type 
-        * dummycoded, and the remaining are of type scale.
-        * 
-        * @param fs
-        * @param txMtdDir
-        * @param numCols
-        * @param ra
-        * @param ba
-        * @return Number of columns in the transformed data
-        * @throws IOException
-        */
-       public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int 
numCols, TfUtils agents) throws IOException {
-               
-               // initialize all column types in the transformed data to SCALE
-               ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
-               for(int i=0; i < _dummycodedLength; i++)
-                       ctypes[i] = ColumnTypes.SCALE;
-               
-               _dcdColumnMap = new int[numCols];
-
-               Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
-               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
-               
-               int sum=1;
-               int idx = 0;
-               for(int colID=1; colID <= numCols; colID++) 
-               {
-                       if ( _dcdList != null && idx < _dcdList.length && 
_dcdList[idx] == colID )
-                       {
-                               br.write(colID + "," + "1" + "," + sum + "," + 
(sum+_domainSizes[idx]-1) + "\n");
-                               _dcdColumnMap[colID-1] = 
(sum+_domainSizes[idx]-1)-1;
-
-                               for(int i=sum; i <=(sum+_domainSizes[idx]-1); 
i++)
-                                       ctypes[i-1] = ColumnTypes.DUMMYCODED;
-                               
-                               sum += _domainSizes[idx];
-                               idx++;
-                       }
-                       else 
-                       {
-                               br.write(colID + "," + "0" + "," + sum + "," + 
sum + "\n");
-                               _dcdColumnMap[colID-1] = sum-1;
-                               
-                               if ( agents.getBinAgent().isBinned(colID) != -1 
)
-                                       ctypes[sum-1] = ColumnTypes.ORDINAL;    
// binned variable results in an ordinal column
-                               
-                               if ( agents.getRecodeAgent().isRecoded(colID) 
!= -1 )
-                                       ctypes[sum-1] = ColumnTypes.NOMINAL;
-                               
-                               sum += 1;
-                       }
-               }
-               br.close();
-
-               // Write coltypes.csv
-               pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME);
-               br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
-               
-               br.write(columnTypeToID(ctypes[0]) + "");
-               for(int i = 1; i < _dummycodedLength; i++) 
-                       br.write( "," + columnTypeToID(ctypes[i]));
-               br.close();
-               
-               return sum-1;
-       }
-       
-       /**
-        * Given a dummycoded column id, find the corresponding original column 
ID.
-        *  
-        * @param colID
-        * @return
-        */
-       public int mapDcdColumnID(int colID) 
-       {
-               for(int i=0; i < _dcdColumnMap.length; i++)
-               {
-                       int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
-                       int end = _dcdColumnMap[i]+1;
-                       //System.out.println((i+1) + ": " + "[" + st + "," + 
end + "]");
-                       
-                       if ( colID >= st && colID <= end)
-                               return i+1;
-               }
-               return -1;
-       }
-       
-       public String constructDummycodedHeader(String header, Pattern delim) {
-               
-               if(_dcdList == null && _binList == null )
-                       // none of the columns are dummycoded, simply return 
the given header
-                       return header;
-               
-               String[] names = delim.split(header, -1);
-               List<String> newNames = null;
-               
-               StringBuilder sb = new StringBuilder();
-               
-               // Dummycoding can be performed on either on a recoded column 
or on a binned column
-               
-               // process recoded columns
-               if(_finalMapsCP != null && _dcdList != null) 
-               {
-                       for(int i=0; i <_dcdList.length; i++) 
-                       {
-                               int colID = _dcdList[i];
-                               HashMap<String,Long> map = 
_finalMapsCP.get(colID);
-                               String colName = 
UtilFunctions.unquote(names[colID-1]);
-                               
-                               if ( map != null  ) 
-                               {
-                                       // order map entries by their recodeID
-                                       Ordering<String> valueComparator = 
Ordering.natural().onResultOf(Functions.forMap(map));
-                                       newNames = 
valueComparator.sortedCopy(map.keySet());
-                                       
-                                       // construct concatenated string of map 
entries
-                                       sb.setLength(0);
-                                       for(int idx=0; idx < newNames.size(); 
idx++) 
-                                       {
-                                               if(idx==0) 
-                                                       sb.append( colName + 
DCD_NAME_SEP + newNames.get(idx));
-                                               else
-                                                       sb.append( delim + 
colName + DCD_NAME_SEP + newNames.get(idx));
-                                       }
-                                       names[colID-1] = sb.toString();         
        // replace original column name with dcd name
-                               }
-                       }
-               }
-               else if(_finalMaps != null && _dcdList != null) {
-                       for(int i=0; i <_dcdList.length; i++) {
-                               int colID = _dcdList[i];
-                               HashMap<String,String> map = 
_finalMaps.get(colID);
-                               String colName = 
UtilFunctions.unquote(names[colID-1]);
-                               
-                               if ( map != null ) 
-                               {
-                                       // order map entries by their recodeID 
(represented as Strings .. "1", "2", etc.)
-                                       Ordering<String> orderByID = new 
Ordering<String>() 
-                                       {
-                                       public int compare(String s1, String 
s2) {
-                                               return (Integer.parseInt(s1) - 
Integer.parseInt(s2));
-                                       }
-                                       };
-                                       
-                                       newNames = 
orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet());
-                                       // construct concatenated string of map 
entries
-                                       sb.setLength(0);
-                                       for(int idx=0; idx < newNames.size(); 
idx++) 
-                                       {
-                                               if(idx==0) 
-                                                       sb.append( colName + 
DCD_NAME_SEP + newNames.get(idx));
-                                               else
-                                                       sb.append( delim + 
colName + DCD_NAME_SEP + newNames.get(idx));
-                                       }
-                                       names[colID-1] = sb.toString();         
        // replace original column name with dcd name
-                               }
-                       }
-               }
-               
-               // process binned columns
-               if (_binList != null) 
-                       for(int i=0; i < _binList.length; i++) 
-                       {
-                               int colID = _binList[i];
-                               
-                               // need to consider only binned and dummycoded 
columns
-                               if(isDummyCoded(colID) == -1)
-                                       continue;
-                               
-                               int numBins = _numBins[i];
-                               String colName = 
UtilFunctions.unquote(names[colID-1]);
-                               
-                               sb.setLength(0);
-                               for(int idx=0; idx < numBins; idx++) 
-                                       if(idx==0) 
-                                               sb.append( colName + 
DCD_NAME_SEP + "Bin" + (idx+1) );
-                                       else
-                                               sb.append( delim + colName + 
DCD_NAME_SEP + "Bin" + (idx+1) );
-                               names[colID-1] = sb.toString();                 
// replace original column name with dcd name
-                       }
-               
-               // Construct the full header
-               sb.setLength(0);
-               for(int colID=0; colID < names.length; colID++) 
-               {
-                       if (colID == 0)
-                               sb.append(names[colID]);
-                       else
-                               sb.append(delim + names[colID]);
-               }
-               //System.out.println("DummycodedHeader: " + sb.toString());
-               
-               return sb.toString();
-       }
-       
-       @Override
-       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if ( _dcdList == null )
-               {
-                       _dummycodedLength = numCols;
-                       return;
-               }
-               
-               // sort to-be dummycoded column IDs in ascending order. This is 
the order in which the new dummycoded record is constructed in apply() function.
-               Arrays.sort(_dcdList);  
-               _domainSizes = new int[_dcdList.length];
-
-               _dummycodedLength = numCols;
-               
-               //HashMap<String, String> map = null;
-               for(int i=0; i<_dcdList.length; i++) {
-                       int colID = _dcdList[i];
-                       
-                       // Find the domain size for colID using _finalMaps or 
_finalMapsCP
-                       int domainSize = 0;
-                       if(_finalMaps != null) {
-                               if(_finalMaps.get(colID) != null)
-                                       domainSize = 
_finalMaps.get(colID).size();
-                       }
-                       else {
-                               if(_finalMapsCP.get(colID) != null)
-                                       domainSize = 
_finalMapsCP.get(colID).size();
-                       }
-                       
-                       if ( domainSize != 0 ) {
-                               // dummycoded column
-                               _domainSizes[i] = domainSize;
-                       }
-                       else {
-                               // binned column
-                               if ( _binList != null )
-                               for(int j=0; j<_binList.length; j++) {
-                                       if (colID == _binList[j]) {
-                                               _domainSizes[i] = _numBins[j];
-                                               break;
-                                       }
-                               }
-                       }
-                       _dummycodedLength += _domainSizes[i]-1;
-                       //System.out.println("colID=" + colID + ", domainsize=" 
+ _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
-               }
-       }
-
-       /**
-        * Method to apply transformations.
-        * 
-        * @param words
-        * @return
-        */
-       @Override
-       public String[] apply(String[] words, TfUtils agents) {
-               
-               if ( _dcdList == null )
-                       return words;
-               
-               String[] nwords = new String[(int)_dummycodedLength];
-               
-               int rcdVal = 0;
-               
-               for(int colID=1, idx=0, ncolID=1; colID <= words.length; 
colID++) {
-                       if(idx < _dcdList.length && colID==_dcdList[idx]) {
-                               // dummycoded columns
-                               try {
-                               rcdVal = 
UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
-                               nwords[ ncolID-1+rcdVal-1 ] = "1";
-                               ncolID += _domainSizes[idx];
-                               idx++;
-                               } catch (Exception e) {
-                                       System.out.println("Error in 
dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", 
domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
-                                       throw new RuntimeException(e);
-                               }
-                       }
-                       else {
-                               nwords[ncolID-1] = words[colID-1];
-                               ncolID++;
-                       }
-               }
-               
-               return nwords;
-       }
-       
-       /**
-        * Check if the given column ID is subjected to this transformation.
-        * 
-        */
-       public int isDummyCoded(int colID)
-       {
-               if(_dcdList == null)
-                       return -1;
-               
-               int idx = Arrays.binarySearch(_dcdList, colID);
-               return ( idx >= 0 ? idx : -1);
-       }
-       
-       @Override
-       public void print() {
-               System.out.print("Dummycoding List: \n    ");
-               for(int i : _dcdList) {
-                       System.out.print(i + " ");
-               }
-               System.out.println();
-       }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.wink.json4j.JSONArray;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import com.google.common.base.Functions;
+import com.google.common.collect.Ordering;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+public class DummycodeAgent extends TransformationAgent {      
+       
+       private static final long serialVersionUID = 5832130477659116489L;
+
+       private int[] _dcdList = null;
+       private long numCols = 0;
+       
+       private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+       private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null;
+       private int[] _binList = null;
+       private int[] _numBins = null;
+       
+       private int[] _domainSizes = null;                      // length = #of 
dummycoded columns
+       private int[] _dcdColumnMap = null;                     // to help in 
translating between original and dummycoded column IDs
+       private long _dummycodedLength = 0;                     // #of columns 
after dummycoded
+       
+       DummycodeAgent(int[] list) {
+               _dcdList = list;
+       }
+       
+       DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException {
+               numCols = ncol;
+               
+               if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) )
+                       return;
+               
+               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.DUMMYCODE.toString());
+               JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+                       
+               _dcdList = new int[attrs.size()];
+               for(int i=0; i < _dcdList.length; i++) 
+                       _dcdList[i] = UtilFunctions.toInt(attrs.get(i));
+       }
+       
+       public int[] dcdList() {
+               return _dcdList;
+       }
+       
+       /**
+        * Method to output transformation metadata from the mappers. 
+        * This information is collected and merged by the reducers.
+        * 
+        * @param out
+        * @throws IOException
+        * 
+        */
+       @Override
+       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
+               // There is no metadata required for dummycode.
+               // Required information is output from RecodeAgent.
+               return;
+       }
+       
+       @Override
+       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values,
+                       String outputDir, int colID, FileSystem fs, TfUtils 
agents) throws IOException {
+               // Nothing to do here
+       }
+
+       public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> 
maps) {
+               _finalMaps = maps;
+       }
+       
+       public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> 
maps) {
+               _finalMapsCP = maps;
+       }
+       
+       public void setNumBins(int[] binList, int[] numbins) {
+               _binList = binList;
+               _numBins = numbins;
+       }
+       
+       /**
+        * Method to generate dummyCodedMaps.csv, with the range of column IDs 
for each variable in the original data.
+        * 
+        * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, 
st, end]
+        *              1/0 indicates if ColID is dummycoded or not
+        *              [st,end] is the range of dummycoded column numbers for 
the given ColID
+        * 
+        * It also generates coltypes.csv, with the type (scale, nominal, etc.) 
of columns in the output.
+        * Recoded columns are of type nominal, binner columns are of type 
ordinal, dummycoded columns are of type 
+        * dummycoded, and the remaining are of type scale.
+        * 
+        * @param fs
+        * @param txMtdDir
+        * @param numCols
+        * @param ra
+        * @param ba
+        * @return Number of columns in the transformed data
+        * @throws IOException
+        */
+       public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int 
numCols, TfUtils agents) throws IOException {
+               
+               // initialize all column types in the transformed data to SCALE
+               ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength];
+               for(int i=0; i < _dummycodedLength; i++)
+                       ctypes[i] = ColumnTypes.SCALE;
+               
+               _dcdColumnMap = new int[numCols];
+
+               Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME);
+               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
+               
+               int sum=1;
+               int idx = 0;
+               for(int colID=1; colID <= numCols; colID++) 
+               {
+                       if ( _dcdList != null && idx < _dcdList.length && 
_dcdList[idx] == colID )
+                       {
+                               br.write(colID + "," + "1" + "," + sum + "," + 
(sum+_domainSizes[idx]-1) + "\n");
+                               _dcdColumnMap[colID-1] = 
(sum+_domainSizes[idx]-1)-1;
+
+                               for(int i=sum; i <=(sum+_domainSizes[idx]-1); 
i++)
+                                       ctypes[i-1] = ColumnTypes.DUMMYCODED;
+                               
+                               sum += _domainSizes[idx];
+                               idx++;
+                       }
+                       else 
+                       {
+                               br.write(colID + "," + "0" + "," + sum + "," + 
sum + "\n");
+                               _dcdColumnMap[colID-1] = sum-1;
+                               
+                               if ( agents.getBinAgent().isBinned(colID) != -1 
)
+                                       ctypes[sum-1] = ColumnTypes.ORDINAL;    
// binned variable results in an ordinal column
+                               
+                               if ( agents.getRecodeAgent().isRecoded(colID) 
!= -1 )
+                                       ctypes[sum-1] = ColumnTypes.NOMINAL;
+                               
+                               sum += 1;
+                       }
+               }
+               br.close();
+
+               // Write coltypes.csv
+               pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME);
+               br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
+               
+               br.write(columnTypeToID(ctypes[0]) + "");
+               for(int i = 1; i < _dummycodedLength; i++) 
+                       br.write( "," + columnTypeToID(ctypes[i]));
+               br.close();
+               
+               return sum-1;
+       }
+       
+       /**
+        * Given a dummycoded column id, find the corresponding original column 
ID.
+        *  
+        * @param colID
+        * @return
+        */
+       public int mapDcdColumnID(int colID) 
+       {
+               for(int i=0; i < _dcdColumnMap.length; i++)
+               {
+                       int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1);
+                       int end = _dcdColumnMap[i]+1;
+                       //System.out.println((i+1) + ": " + "[" + st + "," + 
end + "]");
+                       
+                       if ( colID >= st && colID <= end)
+                               return i+1;
+               }
+               return -1;
+       }
+       
+       public String constructDummycodedHeader(String header, Pattern delim) {
+               
+               if(_dcdList == null && _binList == null )
+                       // none of the columns are dummycoded, simply return 
the given header
+                       return header;
+               
+               String[] names = delim.split(header, -1);
+               List<String> newNames = null;
+               
+               StringBuilder sb = new StringBuilder();
+               
+               // Dummycoding can be performed on either on a recoded column 
or on a binned column
+               
+               // process recoded columns
+               if(_finalMapsCP != null && _dcdList != null) 
+               {
+                       for(int i=0; i <_dcdList.length; i++) 
+                       {
+                               int colID = _dcdList[i];
+                               HashMap<String,Long> map = 
_finalMapsCP.get(colID);
+                               String colName = 
UtilFunctions.unquote(names[colID-1]);
+                               
+                               if ( map != null  ) 
+                               {
+                                       // order map entries by their recodeID
+                                       Ordering<String> valueComparator = 
Ordering.natural().onResultOf(Functions.forMap(map));
+                                       newNames = 
valueComparator.sortedCopy(map.keySet());
+                                       
+                                       // construct concatenated string of map 
entries
+                                       sb.setLength(0);
+                                       for(int idx=0; idx < newNames.size(); 
idx++) 
+                                       {
+                                               if(idx==0) 
+                                                       sb.append( colName + 
DCD_NAME_SEP + newNames.get(idx));
+                                               else
+                                                       sb.append( delim + 
colName + DCD_NAME_SEP + newNames.get(idx));
+                                       }
+                                       names[colID-1] = sb.toString();         
        // replace original column name with dcd name
+                               }
+                       }
+               }
+               else if(_finalMaps != null && _dcdList != null) {
+                       for(int i=0; i <_dcdList.length; i++) {
+                               int colID = _dcdList[i];
+                               HashMap<String,String> map = 
_finalMaps.get(colID);
+                               String colName = 
UtilFunctions.unquote(names[colID-1]);
+                               
+                               if ( map != null ) 
+                               {
+                                       // order map entries by their recodeID 
(represented as Strings .. "1", "2", etc.)
+                                       Ordering<String> orderByID = new 
Ordering<String>() 
+                                       {
+                                       public int compare(String s1, String 
s2) {
+                                               return (Integer.parseInt(s1) - 
Integer.parseInt(s2));
+                                       }
+                                       };
+                                       
+                                       newNames = 
orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet());
+                                       // construct concatenated string of map 
entries
+                                       sb.setLength(0);
+                                       for(int idx=0; idx < newNames.size(); 
idx++) 
+                                       {
+                                               if(idx==0) 
+                                                       sb.append( colName + 
DCD_NAME_SEP + newNames.get(idx));
+                                               else
+                                                       sb.append( delim + 
colName + DCD_NAME_SEP + newNames.get(idx));
+                                       }
+                                       names[colID-1] = sb.toString();         
        // replace original column name with dcd name
+                               }
+                       }
+               }
+               
+               // process binned columns
+               if (_binList != null) 
+                       for(int i=0; i < _binList.length; i++) 
+                       {
+                               int colID = _binList[i];
+                               
+                               // need to consider only binned and dummycoded 
columns
+                               if(isDummyCoded(colID) == -1)
+                                       continue;
+                               
+                               int numBins = _numBins[i];
+                               String colName = 
UtilFunctions.unquote(names[colID-1]);
+                               
+                               sb.setLength(0);
+                               for(int idx=0; idx < numBins; idx++) 
+                                       if(idx==0) 
+                                               sb.append( colName + 
DCD_NAME_SEP + "Bin" + (idx+1) );
+                                       else
+                                               sb.append( delim + colName + 
DCD_NAME_SEP + "Bin" + (idx+1) );
+                               names[colID-1] = sb.toString();                 
// replace original column name with dcd name
+                       }
+               
+               // Construct the full header
+               sb.setLength(0);
+               for(int colID=0; colID < names.length; colID++) 
+               {
+                       if (colID == 0)
+                               sb.append(names[colID]);
+                       else
+                               sb.append(delim + names[colID]);
+               }
+               //System.out.println("DummycodedHeader: " + sb.toString());
+               
+               return sb.toString();
+       }
+       
+       @Override
+       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
+               if ( _dcdList == null )
+               {
+                       _dummycodedLength = numCols;
+                       return;
+               }
+               
+               // sort to-be dummycoded column IDs in ascending order. This is 
the order in which the new dummycoded record is constructed in apply() function.
+               Arrays.sort(_dcdList);  
+               _domainSizes = new int[_dcdList.length];
+
+               _dummycodedLength = numCols;
+               
+               //HashMap<String, String> map = null;
+               for(int i=0; i<_dcdList.length; i++) {
+                       int colID = _dcdList[i];
+                       
+                       // Find the domain size for colID using _finalMaps or 
_finalMapsCP
+                       int domainSize = 0;
+                       if(_finalMaps != null) {
+                               if(_finalMaps.get(colID) != null)
+                                       domainSize = 
_finalMaps.get(colID).size();
+                       }
+                       else {
+                               if(_finalMapsCP.get(colID) != null)
+                                       domainSize = 
_finalMapsCP.get(colID).size();
+                       }
+                       
+                       if ( domainSize != 0 ) {
+                               // dummycoded column
+                               _domainSizes[i] = domainSize;
+                       }
+                       else {
+                               // binned column
+                               if ( _binList != null )
+                               for(int j=0; j<_binList.length; j++) {
+                                       if (colID == _binList[j]) {
+                                               _domainSizes[i] = _numBins[j];
+                                               break;
+                                       }
+                               }
+                       }
+                       _dummycodedLength += _domainSizes[i]-1;
+                       //System.out.println("colID=" + colID + ", domainsize=" 
+ _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
+               }
+       }
+
+       /**
+        * Method to apply transformations.
+        * 
+        * @param words
+        * @return
+        */
+       @Override
+       public String[] apply(String[] words, TfUtils agents) {
+               
+               if ( _dcdList == null )
+                       return words;
+               
+               String[] nwords = new String[(int)_dummycodedLength];
+               
+               int rcdVal = 0;
+               
+               for(int colID=1, idx=0, ncolID=1; colID <= words.length; 
colID++) {
+                       if(idx < _dcdList.length && colID==_dcdList[idx]) {
+                               // dummycoded columns
+                               try {
+                               rcdVal = 
UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1]));
+                               nwords[ ncolID-1+rcdVal-1 ] = "1";
+                               ncolID += _domainSizes[idx];
+                               idx++;
+                               } catch (Exception e) {
+                                       System.out.println("Error in 
dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", 
domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength);
+                                       throw new RuntimeException(e);
+                               }
+                       }
+                       else {
+                               nwords[ncolID-1] = words[colID-1];
+                               ncolID++;
+                       }
+               }
+               
+               return nwords;
+       }
+       
+       /**
+        * Check if the given column ID is subjected to this transformation.
+        * 
+        */
+       public int isDummyCoded(int colID)
+       {
+               if(_dcdList == null)
+                       return -1;
+               
+               int idx = Arrays.binarySearch(_dcdList, colID);
+               return ( idx >= 0 ? idx : -1);
+       }
+       
+       @Override
+       public void print() {
+               System.out.print("Dummycoding List: \n    ");
+               for(int i : _dcdList) {
+                       System.out.print(i + " ");
+               }
+               System.out.println();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
index e254403..4e3ece5 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java
@@ -1,107 +1,107 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-
-
-public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, 
DistinctValue>{
-       
-       private OutputCollector<IntWritable, DistinctValue> _collector = null; 
-       private int _mapTaskID = -1;
-       
-       TfUtils _agents = null;
-
-       private boolean _partFileWithHeader = false;
-       private boolean _firstRecordInSplit = true;
-       private String _partFileName = null;
-       private long _offsetInPartFile = -1;
-       
-       // 
----------------------------------------------------------------------------------------------
-       
-       /**
-        * Configure the information used in the mapper, and setup 
transformation agents.
-        */
-       @Override
-       public void configure(JobConf job) {
-               String[] parts = job.get("mapred.task.id").split("_");
-               if ( parts[0].equalsIgnoreCase("task")) {
-                       _mapTaskID = Integer.parseInt(parts[parts.length-1]);
-               }
-               else if ( parts[0].equalsIgnoreCase("attempt")) {
-                       _mapTaskID = Integer.parseInt(parts[parts.length-2]);
-               }
-               else {
-                       throw new RuntimeException("Unrecognized format for 
taskID: " + job.get("mapred.task.id"));
-               }
-
-               try {
-                       _partFileName = TfUtils.getPartFileName(job);
-                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-                       _agents = new TfUtils(job);
-               } catch(IOException e) { throw new RuntimeException(e); }
-                 catch(JSONException e)  { throw new RuntimeException(e); }
-
-       }
-       
-       
-       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws 
IOException  {
-               
-               if(_firstRecordInSplit)
-               {
-                       _firstRecordInSplit = false;
-                       _collector = out;
-                       _offsetInPartFile = rawKey.get();
-               }
-               
-               // ignore header
-               if (_agents.hasHeader() && rawKey.get() == 0 && 
_partFileWithHeader)
-                       return;
-               
-               _agents.prepareTfMtd(rawValue.toString());
-       }
-
-       @Override
-       public void close() throws IOException {
-               
_agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, 
_mapTaskID, _agents);
-               
_agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, 
_mapTaskID, _agents);
-               
_agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, 
_agents);
-               
-               // Output part-file offsets to create OFFSETS_FILE, which is to 
be used in csv reblocking.
-               // OffsetCount is denoted as a DistinctValue by concatenating 
parfile name and offset within partfile.
-               _collector.collect(new 
IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new 
OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())));
-               
-               // reset global variables, required when the jvm is reused.
-               _firstRecordInSplit = true;
-               _offsetInPartFile = -1;
-               _partFileWithHeader = false;
-       }
-       
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+
+
+public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, 
DistinctValue>{
+       
+       private OutputCollector<IntWritable, DistinctValue> _collector = null; 
+       private int _mapTaskID = -1;
+       
+       TfUtils _agents = null;
+
+       private boolean _partFileWithHeader = false;
+       private boolean _firstRecordInSplit = true;
+       private String _partFileName = null;
+       private long _offsetInPartFile = -1;
+       
+       // 
----------------------------------------------------------------------------------------------
+       
+       /**
+        * Configure the information used in the mapper, and setup 
transformation agents.
+        */
+       @Override
+       public void configure(JobConf job) {
+               String[] parts = job.get("mapred.task.id").split("_");
+               if ( parts[0].equalsIgnoreCase("task")) {
+                       _mapTaskID = Integer.parseInt(parts[parts.length-1]);
+               }
+               else if ( parts[0].equalsIgnoreCase("attempt")) {
+                       _mapTaskID = Integer.parseInt(parts[parts.length-2]);
+               }
+               else {
+                       throw new RuntimeException("Unrecognized format for 
taskID: " + job.get("mapred.task.id"));
+               }
+
+               try {
+                       _partFileName = TfUtils.getPartFileName(job);
+                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
+                       _agents = new TfUtils(job);
+               } catch(IOException e) { throw new RuntimeException(e); }
+                 catch(JSONException e)  { throw new RuntimeException(e); }
+
+       }
+       
+       
+       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws 
IOException  {
+               
+               if(_firstRecordInSplit)
+               {
+                       _firstRecordInSplit = false;
+                       _collector = out;
+                       _offsetInPartFile = rawKey.get();
+               }
+               
+               // ignore header
+               if (_agents.hasHeader() && rawKey.get() == 0 && 
_partFileWithHeader)
+                       return;
+               
+               _agents.prepareTfMtd(rawValue.toString());
+       }
+
+       @Override
+       public void close() throws IOException {
+               
_agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, 
_mapTaskID, _agents);
+               
_agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, 
_mapTaskID, _agents);
+               
_agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, 
_agents);
+               
+               // Output part-file offsets to create OFFSETS_FILE, which is to 
be used in csv reblocking.
+               // OffsetCount is denoted as a DistinctValue by concatenating 
parfile name and offset within partfile.
+               _collector.collect(new 
IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new 
OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())));
+               
+               // reset global variables, required when the jvm is reused.
+               _firstRecordInSplit = true;
+               _offsetInPartFile = -1;
+               _partFileWithHeader = false;
+       }
+       
+}

Reply via email to