http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java 
b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
index 1a646cf..2e3fd75 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java
@@ -1,124 +1,124 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.wink.json4j.JSONException;
-
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-
-public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, 
Text, LongWritable> {
-       
-       private JobConf _rJob = null;
-       TfUtils _agents = null;
-       
-       @Override
-       public void configure(JobConf job) {
-               _rJob = job;
-               
-               try {
-                       String outputDir = 
MRJobConfiguration.getOutputs(job)[0];
-                       _agents = new TfUtils(job, outputDir);
-               } 
-               catch(IOException e)  { throw new RuntimeException(e); }
-               catch(JSONException e)  { throw new RuntimeException(e); }
-       }
-
-       @Override
-       public void close() throws IOException {
-       }
-       
-       @Override
-       public void reduce(IntWritable key, Iterator<DistinctValue> values,
-                       OutputCollector<Text, LongWritable> output, Reporter 
reporter)
-                       throws IOException {
-               
-               FileSystem fs = FileSystem.get(_rJob);
-               
-               int colID = key.get();
-               
-               if(colID < 0) 
-               {
-                       // process mapper output for MV and Bin agents
-                       colID = colID*-1;
-                       
_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, 
_agents.getTfMtdDir(), colID, fs, _agents);
-               }
-               else if ( colID == _agents.getNumCols() + 1)
-               {
-                       // process mapper output for OFFSET_FILE
-                       ArrayList<OffsetCount> list = new 
ArrayList<OffsetCount>();
-                       while(values.hasNext())
-                               list.add(new 
OffsetCount(values.next().getOffsetCount()));
-                       
-                       long numTfRows = generateOffsetsFile(list);
-                       
reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS,
 numTfRows);
-
-               }
-               else 
-               {
-                       // process mapper output for Recode agent
-                       
_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, 
_agents.getTfMtdDir(), colID, fs, _agents);
-               }
-               
-       }
-       
-       @SuppressWarnings("unchecked")
-       private long generateOffsetsFile(ArrayList<OffsetCount> list) throws 
IllegalArgumentException, IOException 
-       {
-               Collections.sort(list);
-               
-               @SuppressWarnings("deprecation")
-               SequenceFile.Writer writer = new SequenceFile.Writer(
-                               FileSystem.get(_rJob), _rJob, 
-                               new 
Path(_agents.getOffsetFile()+"/part-00000"), 
-                               ByteWritable.class, OffsetCount.class);
-               
-               long lineOffset=0;
-               for(OffsetCount oc: list)
-               {
-                       long count=oc.count;
-                       oc.count=lineOffset;
-                       writer.append(new ByteWritable((byte)0), oc);
-                       lineOffset+=count;
-               }
-               writer.close();
-               list.clear();
-               
-               return lineOffset;
-       }
-       
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+
+public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, 
Text, LongWritable> {
+       
+       private JobConf _rJob = null;
+       TfUtils _agents = null;
+       
+       @Override
+       public void configure(JobConf job) {
+               _rJob = job;
+               
+               try {
+                       String outputDir = 
MRJobConfiguration.getOutputs(job)[0];
+                       _agents = new TfUtils(job, outputDir);
+               } 
+               catch(IOException e)  { throw new RuntimeException(e); }
+               catch(JSONException e)  { throw new RuntimeException(e); }
+       }
+
+       @Override
+       public void close() throws IOException {
+       }
+       
+       @Override
+       public void reduce(IntWritable key, Iterator<DistinctValue> values,
+                       OutputCollector<Text, LongWritable> output, Reporter 
reporter)
+                       throws IOException {
+               
+               FileSystem fs = FileSystem.get(_rJob);
+               
+               int colID = key.get();
+               
+               if(colID < 0) 
+               {
+                       // process mapper output for MV and Bin agents
+                       colID = colID*-1;
+                       
_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, 
_agents.getTfMtdDir(), colID, fs, _agents);
+               }
+               else if ( colID == _agents.getNumCols() + 1)
+               {
+                       // process mapper output for OFFSET_FILE
+                       ArrayList<OffsetCount> list = new 
ArrayList<OffsetCount>();
+                       while(values.hasNext())
+                               list.add(new 
OffsetCount(values.next().getOffsetCount()));
+                       
+                       long numTfRows = generateOffsetsFile(list);
+                       
reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS,
 numTfRows);
+
+               }
+               else 
+               {
+                       // process mapper output for Recode agent
+                       
_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, 
_agents.getTfMtdDir(), colID, fs, _agents);
+               }
+               
+       }
+       
+       @SuppressWarnings("unchecked")
+       private long generateOffsetsFile(ArrayList<OffsetCount> list) throws 
IllegalArgumentException, IOException 
+       {
+               Collections.sort(list);
+               
+               @SuppressWarnings("deprecation")
+               SequenceFile.Writer writer = new SequenceFile.Writer(
+                               FileSystem.get(_rJob), _rJob, 
+                               new 
Path(_agents.getOffsetFile()+"/part-00000"), 
+                               ByteWritable.class, OffsetCount.class);
+               
+               long lineOffset=0;
+               for(OffsetCount oc: list)
+               {
+                       long count=oc.count;
+                       oc.count=lineOffset;
+                       writer.append(new ByteWritable((byte)0), oc);
+                       lineOffset+=count;
+               }
+               writer.close();
+               list.clear();
+               
+               return lineOffset;
+       }
+       
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java 
b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
index b1e79dd..09b9148 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
@@ -1,106 +1,106 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-
-/**
- * MR Job to Generate Transform Metadata based on a given transformation 
specification file (JSON format).
- *
- */
-
-public class GenTfMtdMR {
-
-       public static final String DELIM = ",";
-
-       public static long runJob(String inputPath, String txMtdPath, String 
specFileWithIDs, String smallestFile, String partOffsetsFile, 
CSVFileFormatProperties inputDataProperties, long numCols, int replication, 
String headerLine) throws IOException, ClassNotFoundException, 
InterruptedException {
-               JobConf job = new JobConf(GenTfMtdMR.class);
-               job.setJobName("GenTfMTD");
-               
-               /* Setup MapReduce Job */
-               job.setJarByClass(GenTfMtdMR.class);
-               
-               // set relevant classes
-               job.setMapperClass(GTFMTDMapper.class);
-               job.setReducerClass(GTFMTDReducer.class);
-       
-               // set input and output properties
-               job.setInputFormat(TextInputFormat.class);
-               job.setOutputFormat(NullOutputFormat.class);
-               
-               job.setMapOutputKeyClass(IntWritable.class);
-               job.setMapOutputValueClass(DistinctValue.class);
-               
-               job.setOutputKeyClass(Text.class);
-               job.setOutputValueClass(LongWritable.class);
-               
-               job.setInt("dfs.replication", replication);
-
-               FileInputFormat.addInputPath(job, new Path(inputPath));
-               // delete outputPath, if exists already.
-               Path outPath = new Path(txMtdPath);
-               FileSystem fs = FileSystem.get(job);
-               fs.delete(outPath, true);
-               FileOutputFormat.setOutputPath(job, outPath);
-
-               job.set(MRJobConfiguration.TF_HAS_HEADER, 
Boolean.toString(inputDataProperties.hasHeader()));
-               job.set(MRJobConfiguration.TF_DELIM, 
inputDataProperties.getDelim());
-               if ( inputDataProperties.getNAStrings() != null)
-                       // Adding "dummy" string to handle the case of 
na_strings = ""
-                       job.set(MRJobConfiguration.TF_NA_STRINGS, 
TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-               job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs);
-               job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile);
-               job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
-               job.set(MRJobConfiguration.TF_HEADER, headerLine);
-               
-               job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, 
txMtdPath);
-               
-               // offsets file to store part-file names and offsets for each 
input split
-               job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile);
-               
-               //turn off adaptivemr
-               job.setBoolean("adaptivemr.map.enable", false);
-               
-               // Run the job
-               RunningJob runjob = JobClient.runJob(job);
-               
-               Counters c = runjob.getCounters();
-               long tx_numRows = 
c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter();
-
-               return tx_numRows;
-       }
-       
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
+
+/**
+ * MR Job to Generate Transform Metadata based on a given transformation 
specification file (JSON format).
+ *
+ */
+
+public class GenTfMtdMR {
+
+       public static final String DELIM = ",";
+
+       public static long runJob(String inputPath, String txMtdPath, String 
specFileWithIDs, String smallestFile, String partOffsetsFile, 
CSVFileFormatProperties inputDataProperties, long numCols, int replication, 
String headerLine) throws IOException, ClassNotFoundException, 
InterruptedException {
+               JobConf job = new JobConf(GenTfMtdMR.class);
+               job.setJobName("GenTfMTD");
+               
+               /* Setup MapReduce Job */
+               job.setJarByClass(GenTfMtdMR.class);
+               
+               // set relevant classes
+               job.setMapperClass(GTFMTDMapper.class);
+               job.setReducerClass(GTFMTDReducer.class);
+       
+               // set input and output properties
+               job.setInputFormat(TextInputFormat.class);
+               job.setOutputFormat(NullOutputFormat.class);
+               
+               job.setMapOutputKeyClass(IntWritable.class);
+               job.setMapOutputValueClass(DistinctValue.class);
+               
+               job.setOutputKeyClass(Text.class);
+               job.setOutputValueClass(LongWritable.class);
+               
+               job.setInt("dfs.replication", replication);
+
+               FileInputFormat.addInputPath(job, new Path(inputPath));
+               // delete outputPath, if exists already.
+               Path outPath = new Path(txMtdPath);
+               FileSystem fs = FileSystem.get(job);
+               fs.delete(outPath, true);
+               FileOutputFormat.setOutputPath(job, outPath);
+
+               job.set(MRJobConfiguration.TF_HAS_HEADER, 
Boolean.toString(inputDataProperties.hasHeader()));
+               job.set(MRJobConfiguration.TF_DELIM, 
inputDataProperties.getDelim());
+               if ( inputDataProperties.getNAStrings() != null)
+                       // Adding "dummy" string to handle the case of 
na_strings = ""
+                       job.set(MRJobConfiguration.TF_NA_STRINGS, 
TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
+               job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs);
+               job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile);
+               job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols);
+               job.set(MRJobConfiguration.TF_HEADER, headerLine);
+               
+               job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, 
txMtdPath);
+               
+               // offsets file to store part-file names and offsets for each 
input split
+               job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile);
+               
+               //turn off adaptivemr
+               job.setBoolean("adaptivemr.map.enable", false);
+               
+               // Run the job
+               RunningJob runjob = JobClient.runJob(job);
+               
+               Counters c = runjob.getCounters();
+               long tx_numRows = 
c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter();
+
+               return tx_numRows;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java 
b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
index 6b811ef..e0644ff 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
@@ -1,235 +1,235 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.wink.json4j.JSONException;
-import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-
-public class GenTfMtdSPARK {
-
-       /**
-        * Spark code to Generate Transform Metadata based on the given 
transformation
-        * specification file (JSON format).
-        * 
-        */
-
-       public static long runSparkJob(SparkExecutionContext sec, 
JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
-                                                                       String 
tfMtdPath, String specFile, 
-                                                                       String 
partOffsetsFile, CSVFileFormatProperties prop, 
-                                                                       long 
numCols, String headerLine
-                                                               ) throws 
IOException, ClassNotFoundException, InterruptedException, 
IllegalArgumentException, JSONException {
-               
-               // Construct transformation metadata (map-side)
-               // Note: logic is similar to GTFMTDMapper
-               JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput 
-                       = inputRDD.mapPartitionsWithIndex(
-                                       new GenTfMtdMap(prop.hasHeader(), 
-                                                                       
prop.getDelim(), 
-                                                                       
prop.getNAStrings(), 
-                                                                       
specFile, 
-                                                                       
numCols, 
-                                                                       
headerLine), 
-                                       true );
-               
-               // Shuffle to group by DistinctValue
-               JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = 
JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey();
-               
-               // Construct transformation metadata (Reduce-side)
-               // Note: logic is similar to GTFMTDReducer
-               JavaRDD<Long> out 
-                       = rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(), 
-                                                                               
                prop.getDelim(), 
-                                                                               
                prop.getNAStrings(), 
-                                                                               
                headerLine, 
-                                                                               
                tfMtdPath, 
-                                                                               
                partOffsetsFile, 
-                                                                               
                specFile, 
-                                                                               
                numCols)  );
-               
-               // Compute the total number of transformed rows
-               long numRows = out.reduce(new Function2<Long,Long,Long>() {
-                       private static final long serialVersionUID = 
1263336168859959795L;
-
-                       @Override
-                       public Long call(Long v1, Long v2) throws Exception {
-                               return v1+v2;
-                       }
-                       
-               });
-               
-               return numRows;
-       }
-       
-       // 
----------------------------------------------------------------------------------------------------------------------
-       
-       public static class GenTfMtdMap implements Function2<Integer, 
Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> {
-
-               private static final long serialVersionUID = 
-5622745445470598215L;
-               
-               TfUtils _agents = null;
-               
-               GenTfMtdMap(boolean hasHeader, String delim, String naStrings, 
String specFile, long numCols, String headerLine) throws 
IllegalArgumentException, IOException, JSONException {
-                       
-                       // Setup Transformation Agents
-                       JobConf job = new JobConf();
-                       FileSystem fs = FileSystem.get(job);
-                       String[] nas = TfUtils.parseNAStrings(naStrings);
-                       
-                       JSONObject spec = TfUtils.readSpec(fs, specFile);
-                       _agents = new TfUtils(headerLine, hasHeader, delim, 
nas, spec, numCols, null, null, null);
-
-               }
-               
-               @Override
-               public Iterator<Tuple2<Integer,DistinctValue>> call(Integer 
partitionID,
-                               Iterator<Tuple2<LongWritable, Text>> csvLines) 
throws Exception {
-                       
-                       // Construct transformation metadata by looping through 
csvLines
-                       // Note: logic is similar to GTFMTDMapper
-                       
-                       boolean first = true;
-                       Tuple2<LongWritable, Text> rec = null;
-                       long _offsetInPartFile = -1;
-                       
-                       while(csvLines.hasNext()) {
-                               rec = csvLines.next();
-                               
-                               if (first) {
-                                       first = false;
-                                       _offsetInPartFile = rec._1().get();
-                                       
-                                       if (partitionID == 0 && 
_agents.hasHeader() && _offsetInPartFile == 0 )
-                                               continue; // skip the header 
line
-                               }
-                               
-                               _agents.prepareTfMtd(rec._2().toString());
-                       }
-                       
-                       // Prepare the output in the form of DistinctValues, 
which subsequently need to be grouped and aggregated. 
-                       
-                       ArrayList<Tuple2<Integer,DistinctValue>> outList = new 
ArrayList<Tuple2<Integer,DistinctValue>>();
-                       
-                       
_agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, 
outList, _agents);
-                       
_agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, 
_agents);
-                       
_agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, 
_agents);
-                       
-                       DistinctValue dv = new DistinctValue(new 
OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
-                       Tuple2<Integer, DistinctValue> tuple = new 
Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); 
-                       outList.add(tuple);
-
-                       return outList.iterator();
-               }
-               
-       }
-       
-       // 
------------------------------------------------------------------------------------------------
-       
-       public static class GenTfMtdReduce implements 
FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> {
-               
-               private static final long serialVersionUID = 
-2733233671193035242L;
-               TfUtils _agents = null;
-               
-               GenTfMtdReduce(boolean hasHeader, String delim, String 
naStrings, String headerLine, String tfMtdDir, String offsetFile, String 
specFile, long numCols) throws IOException, JSONException {
-                       String[] nas = TfUtils.parseNAStrings(naStrings); 
-                       FileSystem fs = FileSystem.get(new JobConf());
-
-                       JSONObject spec = TfUtils.readSpec(fs, specFile);
-                       _agents = new TfUtils(headerLine, hasHeader, delim, 
nas, spec, numCols, tfMtdDir, offsetFile, null);
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public Iterable<Long> call(Tuple2<Integer, 
Iterable<DistinctValue>> t)
-                               throws Exception {
-                       
-                       int colID = t._1();
-                       Iterator<DistinctValue> iterDV = t._2().iterator();
-
-                       JobConf job = new JobConf();
-                       FileSystem fs = FileSystem.get(job);
-                       
-                       ArrayList<Long> numRows = new ArrayList<Long>();
-                       
-                       if(colID < 0) 
-                       {
-                               // process mapper output for MV and Bin agents
-                               colID = colID*-1;
-                               
_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, 
_agents.getTfMtdDir(), colID, fs, _agents);
-                               numRows.add(0L);
-                       }
-                       else if ( colID == _agents.getNumCols() + 1)
-                       {
-                               // process mapper output for OFFSET_FILE
-                               ArrayList<OffsetCount> list = new 
ArrayList<OffsetCount>();
-                               while(iterDV.hasNext())
-                                       list.add(new 
OffsetCount(iterDV.next().getOffsetCount()));
-                               Collections.sort(list);
-                               
-                               @SuppressWarnings("deprecation")
-                               SequenceFile.Writer writer = new 
SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), 
ByteWritable.class, OffsetCount.class);
-                               
-                               long lineOffset=0;
-                               for(OffsetCount oc: list)
-                               {
-                                       long count=oc.count;
-                                       oc.count=lineOffset;
-                                       writer.append(new 
ByteWritable((byte)0), oc);
-                                       lineOffset+=count;
-                               }
-                               writer.close();
-                               list.clear();
-                               
-                               numRows.add(lineOffset);
-                       }
-                       else 
-                       {
-                               // process mapper output for Recode agent
-                               
_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, 
_agents.getTfMtdDir(), colID, fs, _agents);
-                               numRows.add(0L);
-                       }
-                       
-                       return numRows;
-               }
-
-       }
-
-       
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.wink.json4j.JSONException;
+import org.apache.wink.json4j.JSONObject;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+
+public class GenTfMtdSPARK {
+
+       /**
+        * Spark code to Generate Transform Metadata based on the given 
transformation
+        * specification file (JSON format).
+        * 
+        */
+
+       public static long runSparkJob(SparkExecutionContext sec, 
JavaRDD<Tuple2<LongWritable, Text>> inputRDD, 
+                                                                       String 
tfMtdPath, String specFile, 
+                                                                       String 
partOffsetsFile, CSVFileFormatProperties prop, 
+                                                                       long 
numCols, String headerLine
+                                                               ) throws 
IOException, ClassNotFoundException, InterruptedException, 
IllegalArgumentException, JSONException {
+               
+               // Construct transformation metadata (map-side)
+               // Note: logic is similar to GTFMTDMapper
+               JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput 
+                       = inputRDD.mapPartitionsWithIndex(
+                                       new GenTfMtdMap(prop.hasHeader(), 
+                                                                       
prop.getDelim(), 
+                                                                       
prop.getNAStrings(), 
+                                                                       
specFile, 
+                                                                       
numCols, 
+                                                                       
headerLine), 
+                                       true );
+               
+               // Shuffle to group by DistinctValue
+               JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = 
JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey();
+               
+               // Construct transformation metadata (Reduce-side)
+               // Note: logic is similar to GTFMTDReducer
+               JavaRDD<Long> out 
+                       = rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(), 
+                                                                               
                prop.getDelim(), 
+                                                                               
                prop.getNAStrings(), 
+                                                                               
                headerLine, 
+                                                                               
                tfMtdPath, 
+                                                                               
                partOffsetsFile, 
+                                                                               
                specFile, 
+                                                                               
                numCols)  );
+               
+               // Compute the total number of transformed rows
+               long numRows = out.reduce(new Function2<Long,Long,Long>() {
+                       private static final long serialVersionUID = 
1263336168859959795L;
+
+                       @Override
+                       public Long call(Long v1, Long v2) throws Exception {
+                               return v1+v2;
+                       }
+                       
+               });
+               
+               return numRows;
+       }
+       
+       // 
----------------------------------------------------------------------------------------------------------------------
+       
+       public static class GenTfMtdMap implements Function2<Integer, 
Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> {
+
+               private static final long serialVersionUID = 
-5622745445470598215L;
+               
+               TfUtils _agents = null;
+               
+               GenTfMtdMap(boolean hasHeader, String delim, String naStrings, 
String specFile, long numCols, String headerLine) throws 
IllegalArgumentException, IOException, JSONException {
+                       
+                       // Setup Transformation Agents
+                       JobConf job = new JobConf();
+                       FileSystem fs = FileSystem.get(job);
+                       String[] nas = TfUtils.parseNAStrings(naStrings);
+                       
+                       JSONObject spec = TfUtils.readSpec(fs, specFile);
+                       _agents = new TfUtils(headerLine, hasHeader, delim, 
nas, spec, numCols, null, null, null);
+
+               }
+               
+               @Override
+               public Iterator<Tuple2<Integer,DistinctValue>> call(Integer 
partitionID,
+                               Iterator<Tuple2<LongWritable, Text>> csvLines) 
throws Exception {
+                       
+                       // Construct transformation metadata by looping through 
csvLines
+                       // Note: logic is similar to GTFMTDMapper
+                       
+                       boolean first = true;
+                       Tuple2<LongWritable, Text> rec = null;
+                       long _offsetInPartFile = -1;
+                       
+                       while(csvLines.hasNext()) {
+                               rec = csvLines.next();
+                               
+                               if (first) {
+                                       first = false;
+                                       _offsetInPartFile = rec._1().get();
+                                       
+                                       if (partitionID == 0 && 
_agents.hasHeader() && _offsetInPartFile == 0 )
+                                               continue; // skip the header 
line
+                               }
+                               
+                               _agents.prepareTfMtd(rec._2().toString());
+                       }
+                       
+                       // Prepare the output in the form of DistinctValues, 
which subsequently need to be grouped and aggregated. 
+                       
+                       ArrayList<Tuple2<Integer,DistinctValue>> outList = new 
ArrayList<Tuple2<Integer,DistinctValue>>();
+                       
+                       
_agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, 
outList, _agents);
+                       
_agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, 
_agents);
+                       
_agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, 
_agents);
+                       
+                       DistinctValue dv = new DistinctValue(new 
OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
+                       Tuple2<Integer, DistinctValue> tuple = new 
Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); 
+                       outList.add(tuple);
+
+                       return outList.iterator();
+               }
+               
+       }
+       
+       // 
------------------------------------------------------------------------------------------------
+       
+       public static class GenTfMtdReduce implements 
FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> {
+               
+               private static final long serialVersionUID = 
-2733233671193035242L;
+               TfUtils _agents = null;
+               
+               GenTfMtdReduce(boolean hasHeader, String delim, String 
naStrings, String headerLine, String tfMtdDir, String offsetFile, String 
specFile, long numCols) throws IOException, JSONException {
+                       String[] nas = TfUtils.parseNAStrings(naStrings); 
+                       FileSystem fs = FileSystem.get(new JobConf());
+
+                       JSONObject spec = TfUtils.readSpec(fs, specFile);
+                       _agents = new TfUtils(headerLine, hasHeader, delim, 
nas, spec, numCols, tfMtdDir, offsetFile, null);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public Iterable<Long> call(Tuple2<Integer, 
Iterable<DistinctValue>> t)
+                               throws Exception {
+                       
+                       int colID = t._1();
+                       Iterator<DistinctValue> iterDV = t._2().iterator();
+
+                       JobConf job = new JobConf();
+                       FileSystem fs = FileSystem.get(job);
+                       
+                       ArrayList<Long> numRows = new ArrayList<Long>();
+                       
+                       if(colID < 0) 
+                       {
+                               // process mapper output for MV and Bin agents
+                               colID = colID*-1;
+                               
_agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, 
_agents.getTfMtdDir(), colID, fs, _agents);
+                               numRows.add(0L);
+                       }
+                       else if ( colID == _agents.getNumCols() + 1)
+                       {
+                               // process mapper output for OFFSET_FILE
+                               ArrayList<OffsetCount> list = new 
ArrayList<OffsetCount>();
+                               while(iterDV.hasNext())
+                                       list.add(new 
OffsetCount(iterDV.next().getOffsetCount()));
+                               Collections.sort(list);
+                               
+                               @SuppressWarnings("deprecation")
+                               SequenceFile.Writer writer = new 
SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), 
ByteWritable.class, OffsetCount.class);
+                               
+                               long lineOffset=0;
+                               for(OffsetCount oc: list)
+                               {
+                                       long count=oc.count;
+                                       oc.count=lineOffset;
+                                       writer.append(new 
ByteWritable((byte)0), oc);
+                                       lineOffset+=count;
+                               }
+                               writer.close();
+                               list.clear();
+                               
+                               numRows.add(lineOffset);
+                       }
+                       else 
+                       {
+                               // process mapper output for Recode agent
+                               
_agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, 
_agents.getTfMtdDir(), colID, fs, _agents);
+                               numRows.add(0L);
+                       }
+                       
+                       return numRows;
+               }
+
+       }
+
+       
+}

Reply via email to