http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java index 1a646cf..2e3fd75 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java @@ -1,124 +1,124 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; - -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; - - -public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> { - - private JobConf _rJob = null; - TfUtils _agents = null; - - @Override - public void configure(JobConf job) { - _rJob = job; - - try { - String outputDir = MRJobConfiguration.getOutputs(job)[0]; - _agents = new TfUtils(job, outputDir); - } - catch(IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - } - - @Override - public void close() throws IOException { - } - - @Override - public void reduce(IntWritable key, Iterator<DistinctValue> values, - OutputCollector<Text, LongWritable> output, Reporter reporter) - throws IOException { - - FileSystem fs = FileSystem.get(_rJob); - - int colID = key.get(); - - if(colID < 0) - { - // process mapper output for MV and Bin agents - colID = colID*-1; - _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents); - } - else if ( colID == _agents.getNumCols() + 1) - { - // process mapper output for OFFSET_FILE - ArrayList<OffsetCount> list = new ArrayList<OffsetCount>(); - while(values.hasNext()) - list.add(new OffsetCount(values.next().getOffsetCount())); - - long numTfRows = generateOffsetsFile(list); - reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows); - - } - else - { - // process mapper output for Recode agent - _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents); - } - - } - - @SuppressWarnings("unchecked") - private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException - { - Collections.sort(list); - - @SuppressWarnings("deprecation") - SequenceFile.Writer writer = new SequenceFile.Writer( - FileSystem.get(_rJob), _rJob, - new Path(_agents.getOffsetFile()+"/part-00000"), - ByteWritable.class, OffsetCount.class); - - long lineOffset=0; - for(OffsetCount oc: list) - { - long count=oc.count; - oc.count=lineOffset; - writer.append(new ByteWritable((byte)0), oc); - lineOffset+=count; - } - writer.close(); - list.clear(); - - return lineOffset; - } - -} - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.wink.json4j.JSONException; + +import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; +import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; + + +public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> { + + private JobConf _rJob = null; + TfUtils _agents = null; + + @Override + public void configure(JobConf job) { + _rJob = job; + + try { + String outputDir = MRJobConfiguration.getOutputs(job)[0]; + _agents = new TfUtils(job, outputDir); + } + catch(IOException e) { throw new RuntimeException(e); } + catch(JSONException e) { throw new RuntimeException(e); } + } + + @Override + public void close() throws IOException { + } + + @Override + public void reduce(IntWritable key, Iterator<DistinctValue> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + + FileSystem fs = FileSystem.get(_rJob); + + int colID = key.get(); + + if(colID < 0) + { + // process mapper output for MV and Bin agents + colID = colID*-1; + _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents); + } + else if ( colID == _agents.getNumCols() + 1) + { + // process mapper output for OFFSET_FILE + ArrayList<OffsetCount> list = new ArrayList<OffsetCount>(); + while(values.hasNext()) + list.add(new OffsetCount(values.next().getOffsetCount())); + + long numTfRows = generateOffsetsFile(list); + reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows); + + } + else + { + // process mapper output for Recode agent + _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents); + } + + } + + @SuppressWarnings("unchecked") + private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException + { + Collections.sort(list); + + @SuppressWarnings("deprecation") + SequenceFile.Writer writer = new SequenceFile.Writer( + FileSystem.get(_rJob), _rJob, + new Path(_agents.getOffsetFile()+"/part-00000"), + ByteWritable.class, OffsetCount.class); + + long lineOffset=0; + for(OffsetCount oc: list) + { + long count=oc.count; + oc.count=lineOffset; + writer.append(new ByteWritable((byte)0), oc); + lineOffset+=count; + } + writer.close(); + list.clear(); + + return lineOffset; + } + +} +
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java index b1e79dd..09b9148 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java @@ -1,106 +1,106 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.lib.NullOutputFormat; - -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; - -/** - * MR Job to Generate Transform Metadata based on a given transformation specification file (JSON format). - * - */ - -public class GenTfMtdMR { - - public static final String DELIM = ","; - - public static long runJob(String inputPath, String txMtdPath, String specFileWithIDs, String smallestFile, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException { - JobConf job = new JobConf(GenTfMtdMR.class); - job.setJobName("GenTfMTD"); - - /* Setup MapReduce Job */ - job.setJarByClass(GenTfMtdMR.class); - - // set relevant classes - job.setMapperClass(GTFMTDMapper.class); - job.setReducerClass(GTFMTDReducer.class); - - // set input and output properties - job.setInputFormat(TextInputFormat.class); - job.setOutputFormat(NullOutputFormat.class); - - job.setMapOutputKeyClass(IntWritable.class); - job.setMapOutputValueClass(DistinctValue.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - - job.setInt("dfs.replication", replication); - - FileInputFormat.addInputPath(job, new Path(inputPath)); - // delete outputPath, if exists already. - Path outPath = new Path(txMtdPath); - FileSystem fs = FileSystem.get(job); - fs.delete(outPath, true); - FileOutputFormat.setOutputPath(job, outPath); - - job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader())); - job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim()); - if ( inputDataProperties.getNAStrings() != null) - // Adding "dummy" string to handle the case of na_strings = "" - job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) ); - job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs); - job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile); - job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols); - job.set(MRJobConfiguration.TF_HEADER, headerLine); - - job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, txMtdPath); - - // offsets file to store part-file names and offsets for each input split - job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile); - - //turn off adaptivemr - job.setBoolean("adaptivemr.map.enable", false); - - // Run the job - RunningJob runjob = JobClient.runJob(job); - - Counters c = runjob.getCounters(); - long tx_numRows = c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter(); - - return tx_numRows; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.lib.NullOutputFormat; + +import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; +import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; + +/** + * MR Job to Generate Transform Metadata based on a given transformation specification file (JSON format). + * + */ + +public class GenTfMtdMR { + + public static final String DELIM = ","; + + public static long runJob(String inputPath, String txMtdPath, String specFileWithIDs, String smallestFile, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException { + JobConf job = new JobConf(GenTfMtdMR.class); + job.setJobName("GenTfMTD"); + + /* Setup MapReduce Job */ + job.setJarByClass(GenTfMtdMR.class); + + // set relevant classes + job.setMapperClass(GTFMTDMapper.class); + job.setReducerClass(GTFMTDReducer.class); + + // set input and output properties + job.setInputFormat(TextInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(DistinctValue.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + job.setInt("dfs.replication", replication); + + FileInputFormat.addInputPath(job, new Path(inputPath)); + // delete outputPath, if exists already. + Path outPath = new Path(txMtdPath); + FileSystem fs = FileSystem.get(job); + fs.delete(outPath, true); + FileOutputFormat.setOutputPath(job, outPath); + + job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader())); + job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim()); + if ( inputDataProperties.getNAStrings() != null) + // Adding "dummy" string to handle the case of na_strings = "" + job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) ); + job.set(MRJobConfiguration.TF_SPEC_FILE, specFileWithIDs); + job.set(MRJobConfiguration.TF_SMALLEST_FILE, smallestFile); + job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols); + job.set(MRJobConfiguration.TF_HEADER, headerLine); + + job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, txMtdPath); + + // offsets file to store part-file names and offsets for each input split + job.set(MRJobConfiguration.TF_OFFSETS_FILE, partOffsetsFile); + + //turn off adaptivemr + job.setBoolean("adaptivemr.map.enable", false); + + // Run the job + RunningJob runjob = JobClient.runJob(job); + + Counters c = runjob.getCounters(); + long tx_numRows = c.findCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS).getCounter(); + + return tx_numRows; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java index 6b811ef..e0644ff 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java @@ -1,235 +1,235 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; - -public class GenTfMtdSPARK { - - /** - * Spark code to Generate Transform Metadata based on the given transformation - * specification file (JSON format). - * - */ - - public static long runSparkJob(SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, - String tfMtdPath, String specFile, - String partOffsetsFile, CSVFileFormatProperties prop, - long numCols, String headerLine - ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException { - - // Construct transformation metadata (map-side) - // Note: logic is similar to GTFMTDMapper - JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput - = inputRDD.mapPartitionsWithIndex( - new GenTfMtdMap(prop.hasHeader(), - prop.getDelim(), - prop.getNAStrings(), - specFile, - numCols, - headerLine), - true ); - - // Shuffle to group by DistinctValue - JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey(); - - // Construct transformation metadata (Reduce-side) - // Note: logic is similar to GTFMTDReducer - JavaRDD<Long> out - = rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(), - prop.getDelim(), - prop.getNAStrings(), - headerLine, - tfMtdPath, - partOffsetsFile, - specFile, - numCols) ); - - // Compute the total number of transformed rows - long numRows = out.reduce(new Function2<Long,Long,Long>() { - private static final long serialVersionUID = 1263336168859959795L; - - @Override - public Long call(Long v1, Long v2) throws Exception { - return v1+v2; - } - - }); - - return numRows; - } - - // ---------------------------------------------------------------------------------------------------------------------- - - public static class GenTfMtdMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> { - - private static final long serialVersionUID = -5622745445470598215L; - - TfUtils _agents = null; - - GenTfMtdMap(boolean hasHeader, String delim, String naStrings, String specFile, long numCols, String headerLine) throws IllegalArgumentException, IOException, JSONException { - - // Setup Transformation Agents - JobConf job = new JobConf(); - FileSystem fs = FileSystem.get(job); - String[] nas = TfUtils.parseNAStrings(naStrings); - - JSONObject spec = TfUtils.readSpec(fs, specFile); - _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, null, null, null); - - } - - @Override - public Iterator<Tuple2<Integer,DistinctValue>> call(Integer partitionID, - Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception { - - // Construct transformation metadata by looping through csvLines - // Note: logic is similar to GTFMTDMapper - - boolean first = true; - Tuple2<LongWritable, Text> rec = null; - long _offsetInPartFile = -1; - - while(csvLines.hasNext()) { - rec = csvLines.next(); - - if (first) { - first = false; - _offsetInPartFile = rec._1().get(); - - if (partitionID == 0 && _agents.hasHeader() && _offsetInPartFile == 0 ) - continue; // skip the header line - } - - _agents.prepareTfMtd(rec._2().toString()); - } - - // Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated. - - ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>(); - - _agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); - _agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); - _agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); - - DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal())); - Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); - outList.add(tuple); - - return outList.iterator(); - } - - } - - // ------------------------------------------------------------------------------------------------ - - public static class GenTfMtdReduce implements FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> { - - private static final long serialVersionUID = -2733233671193035242L; - TfUtils _agents = null; - - GenTfMtdReduce(boolean hasHeader, String delim, String naStrings, String headerLine, String tfMtdDir, String offsetFile, String specFile, long numCols) throws IOException, JSONException { - String[] nas = TfUtils.parseNAStrings(naStrings); - FileSystem fs = FileSystem.get(new JobConf()); - - JSONObject spec = TfUtils.readSpec(fs, specFile); - _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, tfMtdDir, offsetFile, null); - } - - @SuppressWarnings("unchecked") - @Override - public Iterable<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t) - throws Exception { - - int colID = t._1(); - Iterator<DistinctValue> iterDV = t._2().iterator(); - - JobConf job = new JobConf(); - FileSystem fs = FileSystem.get(job); - - ArrayList<Long> numRows = new ArrayList<Long>(); - - if(colID < 0) - { - // process mapper output for MV and Bin agents - colID = colID*-1; - _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents); - numRows.add(0L); - } - else if ( colID == _agents.getNumCols() + 1) - { - // process mapper output for OFFSET_FILE - ArrayList<OffsetCount> list = new ArrayList<OffsetCount>(); - while(iterDV.hasNext()) - list.add(new OffsetCount(iterDV.next().getOffsetCount())); - Collections.sort(list); - - @SuppressWarnings("deprecation") - SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class); - - long lineOffset=0; - for(OffsetCount oc: list) - { - long count=oc.count; - oc.count=lineOffset; - writer.append(new ByteWritable((byte)0), oc); - lineOffset+=count; - } - writer.close(); - list.clear(); - - numRows.add(lineOffset); - } - else - { - // process mapper output for Recode agent - _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents); - numRows.add(0L); - } - - return numRows; - } - - } - - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +import scala.Tuple2; + +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; +import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; + +public class GenTfMtdSPARK { + + /** + * Spark code to Generate Transform Metadata based on the given transformation + * specification file (JSON format). + * + */ + + public static long runSparkJob(SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, + String tfMtdPath, String specFile, + String partOffsetsFile, CSVFileFormatProperties prop, + long numCols, String headerLine + ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException { + + // Construct transformation metadata (map-side) + // Note: logic is similar to GTFMTDMapper + JavaRDD<Tuple2<Integer,DistinctValue>> tfMapOutput + = inputRDD.mapPartitionsWithIndex( + new GenTfMtdMap(prop.hasHeader(), + prop.getDelim(), + prop.getNAStrings(), + specFile, + numCols, + headerLine), + true ); + + // Shuffle to group by DistinctValue + JavaPairRDD<Integer,Iterable<DistinctValue>> rdd = JavaPairRDD.fromJavaRDD(tfMapOutput).groupByKey(); + + // Construct transformation metadata (Reduce-side) + // Note: logic is similar to GTFMTDReducer + JavaRDD<Long> out + = rdd.flatMap(new GenTfMtdReduce(prop.hasHeader(), + prop.getDelim(), + prop.getNAStrings(), + headerLine, + tfMtdPath, + partOffsetsFile, + specFile, + numCols) ); + + // Compute the total number of transformed rows + long numRows = out.reduce(new Function2<Long,Long,Long>() { + private static final long serialVersionUID = 1263336168859959795L; + + @Override + public Long call(Long v1, Long v2) throws Exception { + return v1+v2; + } + + }); + + return numRows; + } + + // ---------------------------------------------------------------------------------------------------------------------- + + public static class GenTfMtdMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<Integer,DistinctValue>>> { + + private static final long serialVersionUID = -5622745445470598215L; + + TfUtils _agents = null; + + GenTfMtdMap(boolean hasHeader, String delim, String naStrings, String specFile, long numCols, String headerLine) throws IllegalArgumentException, IOException, JSONException { + + // Setup Transformation Agents + JobConf job = new JobConf(); + FileSystem fs = FileSystem.get(job); + String[] nas = TfUtils.parseNAStrings(naStrings); + + JSONObject spec = TfUtils.readSpec(fs, specFile); + _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, null, null, null); + + } + + @Override + public Iterator<Tuple2<Integer,DistinctValue>> call(Integer partitionID, + Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception { + + // Construct transformation metadata by looping through csvLines + // Note: logic is similar to GTFMTDMapper + + boolean first = true; + Tuple2<LongWritable, Text> rec = null; + long _offsetInPartFile = -1; + + while(csvLines.hasNext()) { + rec = csvLines.next(); + + if (first) { + first = false; + _offsetInPartFile = rec._1().get(); + + if (partitionID == 0 && _agents.hasHeader() && _offsetInPartFile == 0 ) + continue; // skip the header line + } + + _agents.prepareTfMtd(rec._2().toString()); + } + + // Prepare the output in the form of DistinctValues, which subsequently need to be grouped and aggregated. + + ArrayList<Tuple2<Integer,DistinctValue>> outList = new ArrayList<Tuple2<Integer,DistinctValue>>(); + + _agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); + _agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); + _agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, _agents); + + DistinctValue dv = new DistinctValue(new OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal())); + Tuple2<Integer, DistinctValue> tuple = new Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); + outList.add(tuple); + + return outList.iterator(); + } + + } + + // ------------------------------------------------------------------------------------------------ + + public static class GenTfMtdReduce implements FlatMapFunction<Tuple2<Integer, Iterable<DistinctValue>>, Long> { + + private static final long serialVersionUID = -2733233671193035242L; + TfUtils _agents = null; + + GenTfMtdReduce(boolean hasHeader, String delim, String naStrings, String headerLine, String tfMtdDir, String offsetFile, String specFile, long numCols) throws IOException, JSONException { + String[] nas = TfUtils.parseNAStrings(naStrings); + FileSystem fs = FileSystem.get(new JobConf()); + + JSONObject spec = TfUtils.readSpec(fs, specFile); + _agents = new TfUtils(headerLine, hasHeader, delim, nas, spec, numCols, tfMtdDir, offsetFile, null); + } + + @SuppressWarnings("unchecked") + @Override + public Iterable<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t) + throws Exception { + + int colID = t._1(); + Iterator<DistinctValue> iterDV = t._2().iterator(); + + JobConf job = new JobConf(); + FileSystem fs = FileSystem.get(job); + + ArrayList<Long> numRows = new ArrayList<Long>(); + + if(colID < 0) + { + // process mapper output for MV and Bin agents + colID = colID*-1; + _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents); + numRows.add(0L); + } + else if ( colID == _agents.getNumCols() + 1) + { + // process mapper output for OFFSET_FILE + ArrayList<OffsetCount> list = new ArrayList<OffsetCount>(); + while(iterDV.hasNext()) + list.add(new OffsetCount(iterDV.next().getOffsetCount())); + Collections.sort(list); + + @SuppressWarnings("deprecation") + SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class); + + long lineOffset=0; + for(OffsetCount oc: list) + { + long count=oc.count; + oc.count=lineOffset; + writer.append(new ByteWritable((byte)0), oc); + lineOffset+=count; + } + writer.close(); + list.clear(); + + numRows.add(lineOffset); + } + else + { + // process mapper output for Recode agent + _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(iterDV, _agents.getTfMtdDir(), colID, fs, _agents); + numRows.add(0L); + } + + return numRows; + } + + } + + +}
