http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
deleted file mode 100644
index 77c06ae..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow;
-import org.apache.sysml.runtime.matrix.mapred.MapperBase;
-
-@SuppressWarnings("deprecation")
-public class ApplyTfBBMapper extends MapperBase implements 
Mapper<LongWritable, Text, TaggedFirstSecondIndexes, CSVReblockMR.BlockRow>{
-       
-       boolean _partFileWithHeader = false;
-       TfUtils tfmapper = null;
-       Reporter _reporter = null;
-       
-       // variables relevant to CSV Reblock
-       private IndexedBlockRow idxRow = null;
-       private long rowOffset=0;
-       private HashMap<Long, Long> offsetMap=new HashMap<Long, Long>();
-       private boolean _first = true;
-       private long num=0;
-       
-       @Override
-       public void configure(JobConf job) {
-               super.configure(job);
-               try {
-                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-                       tfmapper = new TfUtils(job);
-                       tfmapper.loadTfMetadata(job, true);
-                       
-                       // Load relevant information for CSV Reblock
-                       ByteWritable key=new ByteWritable();
-                       OffsetCount value=new OffsetCount();
-                       Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-                       
-                       Path path=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       String thisfile=path.makeQualified(fs).toString();
-
-                       SequenceFile.Reader reader = null;
-                       try {
-                               reader = new SequenceFile.Reader(fs, p, job);
-                               while (reader.next(key, value)) {
-                                       // "key" needn't be checked since the 
offset file has information about a single CSV input (the raw data file)
-                                       if(thisfile.equals(value.filename))
-                                               offsetMap.put(value.fileOffset, 
value.count);
-                               }
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(reader);
-                       }
-                       
-                       idxRow = new CSVReblockMapper.IndexedBlockRow();
-                       int maxBclen=0;
-               
-                       for(ArrayList<CSVReblockInstruction> insv: 
csv_reblock_instructions)
-                               for(CSVReblockInstruction in: insv)
-                               {       
-                                       if(maxBclen<in.bclen)
-                                               maxBclen=in.bclen;
-                               }
-                       
-                       //always dense since common csv usecase
-                       idxRow.getRow().data.reset(1, maxBclen, false);         
-
-               } catch (IOException e) { throw new RuntimeException(e); }
-                catch(JSONException e)  { throw new RuntimeException(e); }
-
-       }
-       
-       @Override
-       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<TaggedFirstSecondIndexes,CSVReblockMR.BlockRow> out, Reporter 
reporter) throws IOException  {
-               
-               if(_first) {
-                       rowOffset=offsetMap.get(rawKey.get());
-                       _reporter = reporter;
-                       _first=false;
-               }
-               
-               // output the header line
-               if ( rawKey.get() == 0 && _partFileWithHeader ) 
-               {
-                       tfmapper.processHeaderLine();
-                       if ( tfmapper.hasHeader() )
-                               return;
-               }
-               
-               // parse the input line and apply transformation
-               String[] words = tfmapper.getWords(rawValue);
-               
-               if(!tfmapper.omit(words))
-               {
-                       words = tfmapper.apply(words);
-                       try {
-                               tfmapper.check(words);
-                               
-                               // Perform CSV Reblock
-                               CSVReblockInstruction ins = 
csv_reblock_instructions.get(0).get(0);
-                               idxRow = CSVReblockMapper.processRow(idxRow, 
words, rowOffset, num, ins.output, ins.brlen, ins.bclen, ins.fill, 
ins.fillValue, out);
-                       }
-                       catch(DMLRuntimeException e) {
-                               throw new RuntimeException(e.getMessage() + ":" 
+ rawValue.toString());
-                       }
-                       num++;
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-       }
-
-       @Override
-       protected void specialOperationsForActualMap(int index,
-                       OutputCollector<Writable, Writable> out, Reporter 
reporter)
-                       throws IOException {
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
deleted file mode 100644
index e2885d8..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-@SuppressWarnings("deprecation")
-public class ApplyTfCSVMR {
-       
-       public static JobReturn runJob(String inputPath, String spec, String 
mapsPath, String tmpPath, String outputPath, String partOffsetsFile, 
CSVFileFormatProperties inputDataProperties, long numCols, int replication, 
String headerLine) throws IOException, ClassNotFoundException, 
InterruptedException {
-               JobConf job = new JobConf(ApplyTfCSVMR.class);
-               job.setJobName("ApplyTfCSV");
-
-               /* Setup MapReduce Job */
-               job.setJarByClass(ApplyTfCSVMR.class);
-               
-               // set relevant classes
-               job.setMapperClass(ApplyTfCSVMapper.class);
-               job.setNumReduceTasks(0);
-       
-               // Add transformation metadata file as well as partOffsetsFile 
to Distributed cache
-               DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), 
job);
-               DistributedCache.createSymlink(job);
-               
-               Path cachefile=new Path(partOffsetsFile);
-               DistributedCache.addCacheFile(cachefile.toUri(), job);
-               DistributedCache.createSymlink(job);
-               
-               // set input and output properties
-               job.setInputFormat(TextInputFormat.class);
-               job.setOutputFormat(TextOutputFormat.class);
-               
-               job.setMapOutputKeyClass(NullWritable.class);
-               job.setMapOutputValueClass(Text.class);
-               
-               job.setOutputKeyClass(NullWritable.class);
-               job.setOutputValueClass(Text.class);
-               
-               job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
-               
-               FileInputFormat.addInputPath(job, new Path(inputPath));
-               // delete outputPath, if exists already.
-               Path outPath = new Path(outputPath);
-               FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job);
-               fs.delete(outPath, true);
-               FileOutputFormat.setOutputPath(job, outPath);
-
-               job.set(MRJobConfiguration.TF_HAS_HEADER,       
Boolean.toString(inputDataProperties.hasHeader()));
-               job.set(MRJobConfiguration.TF_DELIM,            
inputDataProperties.getDelim());
-               if ( inputDataProperties.getNAStrings() != null)
-                       // Adding "dummy" string to handle the case of 
na_strings = ""
-                       job.set(MRJobConfiguration.TF_NA_STRINGS, 
TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-               job.set(MRJobConfiguration.TF_SPEC, spec);
-               job.set(MRJobConfiguration.TF_SMALLEST_FILE, 
CSVReblockMR.findSmallestFile(job, inputPath));
-               job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, 
outputPath);
-               job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
-               job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
-               job.set(MRJobConfiguration.TF_HEADER, headerLine);
-               job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
-               job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
-               
-               //turn off adaptivemr
-               job.setBoolean("adaptivemr.map.enable", false);
-
-               // Run the job
-               RunningJob runjob = JobClient.runJob(job);
-               
-               // Since transform CSV produces part files w/ prefix 
transform-part-*,
-               // delete all the "default" part-..... files
-               deletePartFiles(fs, outPath);
-               
-               MatrixCharacteristics mc = new MatrixCharacteristics();
-               return new JobReturn(new MatrixCharacteristics[]{mc}, 
runjob.isSuccessful());
-       }
-       
-       private static void deletePartFiles(FileSystem fs, Path path) throws 
FileNotFoundException, IOException
-       {
-               PathFilter filter=new PathFilter(){
-                       public boolean accept(Path file) {
-                               return file.getName().startsWith("part-");
-               }
-               };
-               FileStatus[] list = fs.listStatus(path, filter);
-               for(FileStatus stat : list) {
-                       fs.delete(stat.getPath(), false);
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
deleted file mode 100644
index 05b8a19..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-
-public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, 
NullWritable, Text> {
-       
-       boolean _firstRecordInSplit = true;
-       boolean _partFileWithHeader = false;
-       
-       TfUtils tfmapper = null;
-       Reporter _reporter = null;
-       BufferedWriter br = null;
-       JobConf _rJob = null;
-       
-       @Override
-       public void configure(JobConf job) {
-               try {
-                       _rJob = job;
-                       _partFileWithHeader = TfUtils.isPartFileWithHeader(job);
-                       tfmapper = new TfUtils(job);
-                       
-                       tfmapper.loadTfMetadata(job, true);
-                       
-               } catch (IOException e) { throw new RuntimeException(e); }
-               catch(JSONException e)  { throw new RuntimeException(e); }
-
-       }
-       
-       @Override
-       public void map(LongWritable rawKey, Text rawValue, 
OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException  
{
-               
-               if(_firstRecordInSplit)
-               {
-                       _firstRecordInSplit = false;
-                       _reporter = reporter;
-                       
-                       // generate custom output paths so that order of rows 
in the 
-                       // output (across part files) matches w/ that from 
input data set
-                       String partFileSuffix = tfmapper.getPartFileID(_rJob, 
rawKey.get());
-                       Path mapOutputPath = new Path(tfmapper.getOutputPath() 
+ "/transform-part-" + partFileSuffix);
-                       
-                       // setup the writer for mapper's output
-                       // the default part-..... files will be deleted later 
once the job finishes 
-                       FileSystem fs = 
IOUtilFunctions.getFileSystem(mapOutputPath);
-                       br = new BufferedWriter(new 
OutputStreamWriter(fs.create( mapOutputPath, true)));
-               }
-               
-               // output the header line
-               if ( rawKey.get() == 0 && _partFileWithHeader ) 
-               {
-                       _reporter = reporter;
-                       tfmapper.processHeaderLine();
-                       if ( tfmapper.hasHeader() )
-                               return;
-               }
-               
-               // parse the input line and apply transformation
-               String[] words = tfmapper.getWords(rawValue);
-               
-               if(!tfmapper.omit(words))
-               {
-                       try {
-                               words = tfmapper.apply(words);
-                               String outStr = 
tfmapper.checkAndPrepOutputString(words);
-                               //out.collect(NullWritable.get(), new 
Text(outStr));
-                               br.write(outStr + "\n");
-                       } 
-                       catch(DMLRuntimeException e) {
-                               throw new RuntimeException(e.getMessage() + ": 
" + rawValue.toString());
-                       }
-               }
-       }
-
-       @Override
-       public void close() throws IOException {
-               IOUtilFunctions.closeSilently(br);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
deleted file mode 100644
index b820449..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-
-public class ApplyTfCSVSPARK {
-       
-       /**
-        * Apply transformation metadata and generate the result in CSV format, 
as a
-        * JavaRDD of Strings.
-        * 
-        * @param sec spark execution context
-        * @param inputRDD input rdd
-        * @param tfMtdPath transform metadata path
-        * @param spec transform specification as json string
-        * @param tmpPath temporary file path
-        * @param prop csv file format properties
-        * @param numCols number of columns
-        * @param headerLine header line
-        * @return JavaPairRDD of long-strings
-        * @throws IOException if IOException occurs
-        * @throws ClassNotFoundException if ClassNotFoundException occurs
-        * @throws InterruptedException if InterruptedException occurs
-        * @throws IllegalArgumentException if IllegalArgumentException occurs
-        * @throws JSONException if JSONException occurs
-        */
-       public static JavaPairRDD<Long, String> runSparkJob(
-                       SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, 
Text>> inputRDD, 
-                       String tfMtdPath, String spec, String tmpPath, 
CSVFileFormatProperties prop, 
-                       int numCols, String headerLine) 
-               throws IOException, ClassNotFoundException, 
InterruptedException, IllegalArgumentException, JSONException 
-       {
-               // Load transformation metadata and broadcast it
-               String[] naStrings = 
TfUtils.parseNAStrings(prop.getNAStrings());
-               JSONObject jspec = new JSONObject(spec);
-               TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), 
prop.getDelim(), naStrings, jspec, numCols, tfMtdPath, null, tmpPath);
-               
-               _tfmapper.loadTfMetadata();
-
-               Broadcast<TfUtils> bcast_tf = 
sec.getSparkContext().broadcast(_tfmapper);
-               
-               /*
-                * Construct transformation metadata (map-side) -- the logic is 
similar
-                * to GTFMTDMapper
-                * 
-                * Note: The result of mapPartitionsWithIndex is cached so that 
the
-                * transformed data is not redundantly computed multiple times
-                */
-               JavaPairRDD<Long, String> applyRDD = inputRDD
-                               .mapPartitionsWithIndex( new 
ApplyTfCSVMap(bcast_tf),  true)
-                               .mapToPair(
-                                               new 
PairFunction<String,Long,String>(){
-                                                       private static final 
long serialVersionUID = 3868143093999082931L;
-                                                       @Override
-                                                       public Tuple2<Long, 
String> call(String t) throws Exception {
-                                                               return new 
Tuple2<Long, String>(new Long(1), t);
-                                                       }
-                                               }
-                               ).cache();
-
-               /*
-                * An action to force execution of apply()
-                * 
-                * We need to trigger the execution of this RDD so as to ensure 
the
-                * creation of a few metadata files (headers, dummycoded 
information,
-                * etc.), which are referenced in the caller function.
-                */
-               applyRDD.count();
-               
-               return applyRDD;
-       }
-
-       public static class ApplyTfCSVMap implements Function2<Integer, 
Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> {
-
-               private static final long serialVersionUID = 
1496686437276906911L;
-
-               TfUtils _tfmapper = null;
-               
-               ApplyTfCSVMap(Broadcast<TfUtils> tf) throws 
IllegalArgumentException, IOException, JSONException {
-                       _tfmapper = tf.getValue();
-               }
-               
-               @Override
-               public Iterator<String> call(Integer partitionID,
-                               Iterator<Tuple2<LongWritable, Text>> csvLines) 
throws Exception {
-                       
-                       boolean first = true;
-                       Tuple2<LongWritable, Text> rec = null;
-                       ArrayList<String> outLines = new ArrayList<String>();
-                       
-                       while(csvLines.hasNext()) {
-                               rec = csvLines.next();
-                               
-                               if (first && partitionID == 0) {
-                                       first = false;
-                                       
-                                       _tfmapper.processHeaderLine();
-                                       
-                                       if (_tfmapper.hasHeader() ) {
-                                               continue; 
-                                       }
-                               }
-                               
-                               // parse the input line and apply transformation
-                       
-                               String[] words = _tfmapper.getWords(rec._2());
-                               
-                               if(!_tfmapper.omit(words))
-                               {
-                                       try {
-                                               words = _tfmapper.apply(words);
-                                               String outStr = 
_tfmapper.checkAndPrepOutputString(words);
-                                               outLines.add(outStr);
-                                       } 
-                                       catch(DMLRuntimeException e) {
-                                               throw new 
RuntimeException(e.getMessage() + ": " + rec._2().toString());
-                                       }
-                               }
-                       }
-                       
-                       return outLines.iterator();
-               }
-               
-       }
-
-       
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
deleted file mode 100644
index 8878ff0..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.CharacterCodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.wink.json4j.JSONArray;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-import org.apache.sysml.lops.Lop;
-import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.transform.encode.Encoder;
-import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-public class BinAgent extends Encoder 
-{      
-       private static final long serialVersionUID = 1917445005206076078L;
-
-       public static final String MIN_PREFIX = "min";
-       public static final String MAX_PREFIX = "max";
-       public static final String NBINS_PREFIX = "nbins";
-
-       private int[] _numBins = null;
-       private double[] _min=null, _max=null;  // min and max among 
non-missing values
-       private double[] _binWidths = null;             // width of a bin for 
each attribute
-       
-       //frame transform-apply attributes
-       private double[][] _binMins = null;
-       private double[][] _binMaxs = null;
-       
-       public BinAgent(JSONObject parsedSpec, String[] colnames, int clen) 
-               throws JSONException, IOException 
-       {
-               this(parsedSpec, colnames, clen, false);
-       }
-
-       public BinAgent(JSONObject parsedSpec, String[] colnames, int clen, 
boolean colsOnly) 
-               throws JSONException, IOException 
-       {
-               super( null, clen );            
-               if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) )
-                       return;
-               
-               if( colsOnly ) {
-                       List<Integer> collist = 
TfMetaUtils.parseBinningColIDs(parsedSpec, colnames);
-                       initColList(ArrayUtils.toPrimitive(collist.toArray(new 
Integer[0])));
-               }
-               else 
-               {
-                       JSONObject obj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_BIN);             
-                       JSONArray attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
-                       JSONArray nbins = (JSONArray) 
obj.get(TfUtils.JSON_NBINS);
-                       initColList(attrs);
-                       
-                       _numBins = new int[attrs.size()];
-                       for(int i=0; i < _numBins.length; i++)
-                               _numBins[i] = 
UtilFunctions.toInt(nbins.get(i)); 
-                       
-                       // initialize internal transformation metadata
-                       _min = new double[_colList.length];
-                       Arrays.fill(_min, Double.MAX_VALUE);
-                       _max = new double[_colList.length];
-                       Arrays.fill(_max, -Double.MAX_VALUE);
-                       
-                       _binWidths = new double[_colList.length];
-               }
-       }
-
-       public int[] getNumBins() { return _numBins; }
-       public double[] getMin()  { return _min; }
-       public double[] getBinWidths() { return _binWidths; }
-       
-       public void prepare(String[] words, TfUtils agents) {
-               if ( !isApplicable() )
-                       return;
-               
-               for(int i=0; i <_colList.length; i++) {
-                       int colID = _colList[i];
-                       
-                       String w = null;
-                       double d = 0;
-                               
-                       // equi-width
-                       w = UtilFunctions.unquote(words[colID-1].trim());
-                       if(!TfUtils.isNA(agents.getNAStrings(),w)) {
-                               d = UtilFunctions.parseToDouble(w);
-                               if(d < _min[i])
-                                       _min[i] = d;
-                               if(d > _max[i])
-                                       _max[i] = d;
-                       }
-               }
-       }
-       
-       private DistinctValue prepMinOutput(int idx) throws 
CharacterCodingException {
-               String s =  MIN_PREFIX + Double.toString(_min[idx]);
-               return  new DistinctValue(s, -1L);
-       }
-       
-       private DistinctValue prepMaxOutput(int idx) throws 
CharacterCodingException {
-               String s =  MAX_PREFIX + Double.toString(_max[idx]);
-               return  new DistinctValue(s, -1L);
-       }
-       
-       private DistinctValue prepNBinsOutput(int idx) throws 
CharacterCodingException {
-               String s =  NBINS_PREFIX + Double.toString(_numBins[idx]);
-               return  new DistinctValue(s, -1L);
-       }
-       
-       /**
-        * Method to output transformation metadata from the mappers. 
-        * This information is collected and merged by the reducers.
-        */
-       @Override
-       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
-               if( !isApplicable() )
-                       return;
-               
-               try { 
-                       for(int i=0; i < _colList.length; i++) {
-                               int colID = _colList[i];
-                               IntWritable iw = new IntWritable(-colID);
-                               
-                               out.collect(iw,  prepMinOutput(i));
-                               out.collect(iw,  prepMaxOutput(i));
-                               out.collect(iw,  prepNBinsOutput(i));
-                       }
-               } catch(Exception e) {
-                       throw new IOException(e);
-               }
-       }
-       
-       public ArrayList<Pair<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
-               if ( !isApplicable() )
-                       return list;
-               
-               try { 
-                       for(int i=0; i < _colList.length; i++) {
-                               int colID = _colList[i];
-                               Integer iw = -colID;
-                               
-                               list.add( new Pair<Integer,DistinctValue>(iw, 
prepMinOutput(i)) );
-                               list.add( new Pair<Integer,DistinctValue>(iw, 
prepMaxOutput(i)) );
-                               list.add( new Pair<Integer,DistinctValue>(iw, 
prepNBinsOutput(i)) );
-                       }
-               } catch(Exception e) {
-                       throw new IOException(e);
-               }
-               return list;
-       }
-
-       private void writeTfMtd(int colID, String min, String max, String 
binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws 
IOException 
-       {
-               Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + 
TfUtils.TXMTD_BIN_FILE_SUFFIX);
-               BufferedWriter br = null;
-               try {
-                       br = new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));
-                       br.write(colID + TfUtils.TXMTD_SEP + min + 
TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + 
nbins + "\n");
-               }
-               finally {
-                       IOUtilFunctions.closeSilently(br);
-               }
-       }
-
-       /** 
-        * Method to merge map output transformation metadata.
-        */
-       @Override
-       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
-               double min = Double.MAX_VALUE;
-               double max = -Double.MAX_VALUE;
-               int nbins = 0;
-               
-               DistinctValue val = new DistinctValue();
-               String w = null;
-               double d;
-               while(values.hasNext()) {
-                       val.reset();
-                       val = values.next();
-                       w = val.getWord();
-                       
-                       if(w.startsWith(MIN_PREFIX)) {
-                               d = UtilFunctions.parseToDouble(w.substring( 
MIN_PREFIX.length() ));
-                               if ( d < min )
-                                       min = d;
-                       }
-                       else if(w.startsWith(MAX_PREFIX)) {
-                               d = UtilFunctions.parseToDouble(w.substring( 
MAX_PREFIX.length() ));
-                               if ( d > max )
-                                       max = d;
-                       }
-                       else if (w.startsWith(NBINS_PREFIX)) {
-                               nbins = (int) UtilFunctions.parseToLong( 
w.substring(NBINS_PREFIX.length() ) );
-                       }
-                       else
-                               throw new RuntimeException("MVImputeAgent: 
Invalid prefix while merging map output: " + w);
-               }
-               
-               // write merged metadata
-               double binwidth = (max-min)/nbins;
-               writeTfMtd(colID, Double.toString(min), Double.toString(max), 
Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents);
-       }
-       
-       
-       public void outputTransformationMetadata(String outputDir, FileSystem 
fs, TfUtils agents) throws IOException {
-               if( !isApplicable() )
-                       return;
-               
-               MVImputeAgent mvagent = agents.getMVImputeAgent();
-               for(int i=0; i < _colList.length; i++) {
-                       int colID = _colList[i];
-                       
-                       // If the column is imputed with a constant, then 
adjust min and max based the value of the constant.
-                       if ( mvagent.isApplicable(colID) != -1 && 
mvagent.getMethod(colID) == MVMethod.CONSTANT ) 
-                       {
-                               double cst = UtilFunctions.parseToDouble( 
mvagent.getReplacement(colID) );
-                               if ( cst < _min[i])
-                                       _min[i] = cst;
-                               if ( cst > _max[i])
-                                       _max[i] = cst;
-                       }
-                       
-                       double binwidth = (_max[i] - _min[i])/_numBins[i];
-                       writeTfMtd(colID, Double.toString(_min[i]), 
Double.toString(_max[i]), Double.toString(binwidth), 
Integer.toString(_numBins[i]), outputDir, fs, agents);
-               }
-       }
-       
-       // 
------------------------------------------------------------------------------------------------
-
-       /**
-        * Method to load transform metadata for all attributes
-        */
-       @Override
-       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
-               if( !isApplicable() )
-                       return;
-               
-               if(fs.isDirectory(txMtdDir)) {
-                       for(int i=0; i<_colList.length;i++) {
-                               int colID = _colList[i];
-                               
-                               Path path = new Path( txMtdDir + "/Bin/" + 
agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX);
-                               TfUtils.checkValidInputFile(fs, path, true); 
-                                       
-                               BufferedReader br = null;
-                               try {
-                                       br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
-                                       // format: colID,min,max,nbins
-                                       String[] fields = 
br.readLine().split(TfUtils.TXMTD_SEP);
-                                       double min = 
UtilFunctions.parseToDouble(fields[1]);
-                                       //double max = 
UtilFunctions.parseToDouble(fields[2]);
-                                       double binwidth = 
UtilFunctions.parseToDouble(fields[3]);
-                                       int nbins = 
UtilFunctions.parseToInt(fields[4]);
-                                       
-                                       _numBins[i] = nbins;
-                                       _min[i] = min;
-                                       _binWidths[i] = binwidth; // 
(max-min)/nbins;
-                               }
-                               finally {
-                                       IOUtilFunctions.closeSilently(br);
-                               }
-                       }
-               }
-               else {
-                       throw new RuntimeException("Path to recode maps must be 
a directory: " + txMtdDir);
-               }
-       }
-       
-
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               build(in);
-               return apply(in, out);
-       }
-
-       @Override
-       public void build(FrameBlock in) {
-               // TODO Auto-generated method stub
-       }
-       
-       /**
-        * Method to apply transformations.
-        */
-       @Override
-       public String[] apply(String[] words) {
-               if( !isApplicable() )
-                       return words;
-       
-               for(int i=0; i < _colList.length; i++) {
-                       int colID = _colList[i];
-                       try {
-                               double val = 
UtilFunctions.parseToDouble(words[colID-1]);
-                               int binid = 1;
-                               double tmp = _min[i] + _binWidths[i];
-                               while(val > tmp && binid < _numBins[i]) {
-                                       tmp += _binWidths[i];
-                                       binid++;
-                               }
-                               words[colID-1] = Integer.toString(binid);
-                       } 
-                       catch(NumberFormatException e) {
-                               throw new RuntimeException("Encountered \"" + 
words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric 
value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an 
appropriate imputation method.");
-                       }
-               }
-               
-               return words;
-       }
-
-       @Override
-       public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
-               for(int j=0; j<_colList.length; j++) {
-                       int colID = _colList[j];
-                       for( int i=0; i<in.getNumRows(); i++ ) {
-                               double inVal = UtilFunctions.objectToDouble(
-                                               in.getSchema()[colID-1], 
in.get(i, colID-1));
-                               int ix = Arrays.binarySearch(_binMaxs[j], 
inVal);
-                               int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 
1;               
-                               out.quickSetValue(i, colID-1, binID);
-                       }       
-               }
-               return out;
-       }
-
-       @Override
-       public FrameBlock getMetaData(FrameBlock meta) {
-               return meta;
-       }
-       
-       @Override
-       public void initMetaData(FrameBlock meta) {
-               _binMins = new double[_colList.length][];
-               _binMaxs = new double[_colList.length][];
-               for( int j=0; j<_colList.length; j++ ) {
-                       int colID = _colList[j]; //1-based
-                       int nbins = 
(int)meta.getColumnMetadata()[colID-1].getNumDistinct();
-                       _binMins[j] = new double[nbins];
-                       _binMaxs[j] = new double[nbins];
-                       for( int i=0; i<nbins; i++ ) {
-                               String[] tmp = meta.get(i, 
colID-1).toString().split(Lop.DATATYPE_PREFIX);
-                               _binMins[j][i] = Double.parseDouble(tmp[0]);
-                               _binMaxs[j][i] = Double.parseDouble(tmp[1]);
-                       }
-               }
-       }
-}

Reply via email to