http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java index 0c0d399..7fb1ccc 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java @@ -1,112 +1,112 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; - -import org.apache.sysml.runtime.DMLRuntimeException; - -public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> { - - boolean _firstRecordInSplit = true; - boolean _partFileWithHeader = false; - - TfUtils tfmapper = null; - Reporter _reporter = null; - BufferedWriter br = null; - JobConf _rJob = null; - - @Override - public void configure(JobConf job) { - try { - _rJob = job; - _partFileWithHeader = TfUtils.isPartFileWithHeader(job); - tfmapper = new TfUtils(job); - - tfmapper.loadTfMetadata(job, true); - - } catch (IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - - } - - @Override - public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException { - - if(_firstRecordInSplit) - { - _firstRecordInSplit = false; - _reporter = reporter; - - // generate custom output paths so that order of rows in the - // output (across part files) matches w/ that from input data set - String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get()); - Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix); - - // setup the writer for mapper's output - // the default part-..... files will be deleted later once the job finishes - br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true))); - } - - // output the header line - if ( rawKey.get() == 0 && _partFileWithHeader ) - { - _reporter = reporter; - tfmapper.processHeaderLine(); - if ( tfmapper.hasHeader() ) - return; - } - - // parse the input line and apply transformation - String[] words = tfmapper.getWords(rawValue); - - if(!tfmapper.omit(words)) - { - try { - words = tfmapper.apply(words); - String outStr = tfmapper.checkAndPrepOutputString(words); - //out.collect(NullWritable.get(), new Text(outStr)); - br.write(outStr + "\n"); - } - catch(DMLRuntimeException e) { - throw new RuntimeException(e.getMessage() + ": " + rawValue.toString()); - } - } - } - - @Override - public void close() throws IOException { - if ( br != null ) - br.close(); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.wink.json4j.JSONException; + +import org.apache.sysml.runtime.DMLRuntimeException; + +public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> { + + boolean _firstRecordInSplit = true; + boolean _partFileWithHeader = false; + + TfUtils tfmapper = null; + Reporter _reporter = null; + BufferedWriter br = null; + JobConf _rJob = null; + + @Override + public void configure(JobConf job) { + try { + _rJob = job; + _partFileWithHeader = TfUtils.isPartFileWithHeader(job); + tfmapper = new TfUtils(job); + + tfmapper.loadTfMetadata(job, true); + + } catch (IOException e) { throw new RuntimeException(e); } + catch(JSONException e) { throw new RuntimeException(e); } + + } + + @Override + public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException { + + if(_firstRecordInSplit) + { + _firstRecordInSplit = false; + _reporter = reporter; + + // generate custom output paths so that order of rows in the + // output (across part files) matches w/ that from input data set + String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get()); + Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix); + + // setup the writer for mapper's output + // the default part-..... files will be deleted later once the job finishes + br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true))); + } + + // output the header line + if ( rawKey.get() == 0 && _partFileWithHeader ) + { + _reporter = reporter; + tfmapper.processHeaderLine(); + if ( tfmapper.hasHeader() ) + return; + } + + // parse the input line and apply transformation + String[] words = tfmapper.getWords(rawValue); + + if(!tfmapper.omit(words)) + { + try { + words = tfmapper.apply(words); + String outStr = tfmapper.checkAndPrepOutputString(words); + //out.collect(NullWritable.get(), new Text(outStr)); + br.write(outStr + "\n"); + } + catch(DMLRuntimeException e) { + throw new RuntimeException(e.getMessage() + ": " + rawValue.toString()); + } + } + } + + @Override + public void close() throws IOException { + if ( br != null ) + br.close(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java index 693d687..061f2e3 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java @@ -1,160 +1,160 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.broadcast.Broadcast; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; - - -public class ApplyTfCSVSPARK { - - /** - * Apply transformation metadata and generate the result in CSV format, as a - * JavaRDD of Strings. - */ - - public static JavaPairRDD<Long, String> runSparkJob( - SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, - String tfMtdPath, String specFile, - String tmpPath, CSVFileFormatProperties prop, - int numCols, String headerLine - ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException { - - // Load transformation metadata and broadcast it - JobConf job = new JobConf(); - FileSystem fs = FileSystem.get(job); - - String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings()); - JSONObject spec = TfUtils.readSpec(fs, specFile); - TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath); - - _tfmapper.loadTfMetadata(); - - Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper); - - /* - * Construct transformation metadata (map-side) -- the logic is similar - * to GTFMTDMapper - * - * Note: The result of mapPartitionsWithIndex is cached so that the - * transformed data is not redundantly computed multiple times - */ - JavaPairRDD<Long, String> applyRDD = inputRDD - .mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf), true) - .mapToPair( - new PairFunction<String,Long,String>(){ - private static final long serialVersionUID = 3868143093999082931L; - @Override - public Tuple2<Long, String> call(String t) throws Exception { - return new Tuple2<Long, String>(new Long(1), t); - } - } - ).cache(); - - /* - * An action to force execution of apply() - * - * We need to trigger the execution of this RDD so as to ensure the - * creation of a few metadata files (headers, dummycoded information, - * etc.), which are referenced in the caller function. - */ - applyRDD.count(); - - return applyRDD; - } - - public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> { - - private static final long serialVersionUID = 1496686437276906911L; - - TfUtils _tfmapper = null; - - ApplyTfCSVMap(boolean hasHeader, String delim, String naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException { - _tfmapper = tf.getValue(); - } - - ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException { - _tfmapper = tf.getValue(); - } - - @Override - public Iterator<String> call(Integer partitionID, - Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception { - - boolean first = true; - Tuple2<LongWritable, Text> rec = null; - ArrayList<String> outLines = new ArrayList<String>(); - - while(csvLines.hasNext()) { - rec = csvLines.next(); - - if (first && partitionID == 0) { - first = false; - - _tfmapper.processHeaderLine(); - - if (_tfmapper.hasHeader() ) { - //outLines.add(dcdHeader); // if the header needs to be preserved in the output file - continue; - } - } - - // parse the input line and apply transformation - - String[] words = _tfmapper.getWords(rec._2()); - - if(!_tfmapper.omit(words)) - { - try { - words = _tfmapper.apply(words); - String outStr = _tfmapper.checkAndPrepOutputString(words); - outLines.add(outStr); - } - catch(DMLRuntimeException e) { - throw new RuntimeException(e.getMessage() + ": " + rec._2().toString()); - } - } - } - - return outLines.iterator(); - } - - } - - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +import scala.Tuple2; + +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; + + +public class ApplyTfCSVSPARK { + + /** + * Apply transformation metadata and generate the result in CSV format, as a + * JavaRDD of Strings. + */ + + public static JavaPairRDD<Long, String> runSparkJob( + SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, + String tfMtdPath, String specFile, + String tmpPath, CSVFileFormatProperties prop, + int numCols, String headerLine + ) throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException { + + // Load transformation metadata and broadcast it + JobConf job = new JobConf(); + FileSystem fs = FileSystem.get(job); + + String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings()); + JSONObject spec = TfUtils.readSpec(fs, specFile); + TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, spec, numCols, tfMtdPath, null, tmpPath); + + _tfmapper.loadTfMetadata(); + + Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper); + + /* + * Construct transformation metadata (map-side) -- the logic is similar + * to GTFMTDMapper + * + * Note: The result of mapPartitionsWithIndex is cached so that the + * transformed data is not redundantly computed multiple times + */ + JavaPairRDD<Long, String> applyRDD = inputRDD + .mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf), true) + .mapToPair( + new PairFunction<String,Long,String>(){ + private static final long serialVersionUID = 3868143093999082931L; + @Override + public Tuple2<Long, String> call(String t) throws Exception { + return new Tuple2<Long, String>(new Long(1), t); + } + } + ).cache(); + + /* + * An action to force execution of apply() + * + * We need to trigger the execution of this RDD so as to ensure the + * creation of a few metadata files (headers, dummycoded information, + * etc.), which are referenced in the caller function. + */ + applyRDD.count(); + + return applyRDD; + } + + public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> { + + private static final long serialVersionUID = 1496686437276906911L; + + TfUtils _tfmapper = null; + + ApplyTfCSVMap(boolean hasHeader, String delim, String naStrings, String specFile, String tmpPath, String tfMtdPath, long numCols, String headerLine, Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException { + _tfmapper = tf.getValue(); + } + + ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException { + _tfmapper = tf.getValue(); + } + + @Override + public Iterator<String> call(Integer partitionID, + Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception { + + boolean first = true; + Tuple2<LongWritable, Text> rec = null; + ArrayList<String> outLines = new ArrayList<String>(); + + while(csvLines.hasNext()) { + rec = csvLines.next(); + + if (first && partitionID == 0) { + first = false; + + _tfmapper.processHeaderLine(); + + if (_tfmapper.hasHeader() ) { + //outLines.add(dcdHeader); // if the header needs to be preserved in the output file + continue; + } + } + + // parse the input line and apply transformation + + String[] words = _tfmapper.getWords(rec._2()); + + if(!_tfmapper.omit(words)) + { + try { + words = _tfmapper.apply(words); + String outStr = _tfmapper.checkAndPrepOutputString(words); + outLines.add(outStr); + } + catch(DMLRuntimeException e) { + throw new RuntimeException(e.getMessage() + ": " + rec._2().toString()); + } + } + } + + return outLines.iterator(); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index f08c9ff..b61c781 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -1,355 +1,355 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.nio.charset.CharacterCodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.wink.json4j.JSONArray; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - -import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; -import org.apache.sysml.runtime.util.UtilFunctions; - -public class BinAgent extends TransformationAgent { - - private static final long serialVersionUID = 1917445005206076078L; - - public static final String MIN_PREFIX = "min"; - public static final String MAX_PREFIX = "max"; - public static final String NBINS_PREFIX = "nbins"; - - private int[] _binList = null; - //private byte[] _binMethodList = null; // Not used, since only equi-width is supported for now. - private int[] _numBins = null; - - private double[] _min=null, _max=null; // min and max among non-missing values - - private double[] _binWidths = null; // width of a bin for each attribute - - BinAgent() { } - - BinAgent(JSONObject parsedSpec) throws JSONException { - - if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) ) - return; - - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString()); - - JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); - //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD); - JSONArray nbins = (JSONArray) obj.get(JSON_NBINS); - - assert(attrs.size() == nbins.size()); - - _binList = new int[attrs.size()]; - _numBins = new int[attrs.size()]; - for(int i=0; i < _binList.length; i++) { - _binList[i] = UtilFunctions.toInt(attrs.get(i)); - _numBins[i] = UtilFunctions.toInt(nbins.get(i)); - } - - // initialize internal transformation metadata - _min = new double[_binList.length]; - Arrays.fill(_min, Double.MAX_VALUE); - _max = new double[_binList.length]; - Arrays.fill(_max, -Double.MAX_VALUE); - - _binWidths = new double[_binList.length]; - } - - public void prepare(String[] words, TfUtils agents) { - if ( _binList == null ) - return; - - for(int i=0; i <_binList.length; i++) { - int colID = _binList[i]; - - String w = null; - double d = 0; - - // equi-width - w = UtilFunctions.unquote(words[colID-1].trim()); - if(!agents.isNA(w)) { - d = UtilFunctions.parseToDouble(w); - if(d < _min[i]) - _min[i] = d; - if(d > _max[i]) - _max[i] = d; - } - } - } - - private DistinctValue prepMinOutput(int idx) throws CharacterCodingException { - String s = MIN_PREFIX + Double.toString(_min[idx]); - return new DistinctValue(s, -1L); - } - - private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException { - String s = MAX_PREFIX + Double.toString(_max[idx]); - return new DistinctValue(s, -1L); - } - - private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException { - String s = NBINS_PREFIX + Double.toString(_numBins[idx]); - return new DistinctValue(s, -1L); - } - - /** - * Method to output transformation metadata from the mappers. - * This information is collected and merged by the reducers. - * - * @param out - * @throws IOException - */ - @Override - public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - if ( _binList == null ) - return; - - try { - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; - IntWritable iw = new IntWritable(-colID); - - out.collect(iw, prepMinOutput(i)); - out.collect(iw, prepMaxOutput(i)); - out.collect(iw, prepNBinsOutput(i)); - } - } catch(Exception e) { - throw new IOException(e); - } - } - - public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { - if ( _binList == null ) - return list; - - try { - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; - Integer iw = -colID; - - list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) ); - list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) ); - list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) ); - } - } catch(Exception e) { - throw new IOException(e); - } - return list; - } - - private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException - { - Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP + binwidth + TXMTD_SEP + nbins + "\n"); - br.close(); - } - - /** - * Method to merge map output transformation metadata. - * - * @param values - * @return - * @throws IOException - */ - @Override - public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { - double min = Double.MAX_VALUE; - double max = -Double.MAX_VALUE; - int nbins = 0; - - DistinctValue val = new DistinctValue(); - String w = null; - double d; - while(values.hasNext()) { - val.reset(); - val = values.next(); - w = val.getWord(); - - if(w.startsWith(MIN_PREFIX)) { - d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() )); - if ( d < min ) - min = d; - } - else if(w.startsWith(MAX_PREFIX)) { - d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() )); - if ( d > max ) - max = d; - } - else if (w.startsWith(NBINS_PREFIX)) { - nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) ); - } - else - throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w); - } - - // write merged metadata - double binwidth = (max-min)/nbins; - writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents); - } - - - public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { - if(_binList == null) - return; - - MVImputeAgent mvagent = agents.getMVImputeAgent(); - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; - - // If the column is imputed with a constant, then adjust min and max based the value of the constant. - if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) - { - double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) ); - if ( cst < _min[i]) - _min[i] = cst; - if ( cst > _max[i]) - _max[i] = cst; - } - - double binwidth = (_max[i] - _min[i])/_numBins[i]; - writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents); - } - } - - // ------------------------------------------------------------------------------------------------ - - public int[] getBinList() { return _binList; } - public int[] getNumBins() { return _numBins; } - public double[] getMin() { return _min; } - public double[] getBinWidths() { return _binWidths; } - - /** - * Method to load transform metadata for all attributes - * - * @param job - * @throws IOException - */ - @Override - public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if ( _binList == null ) - return; - - if(fs.isDirectory(txMtdDir)) { - for(int i=0; i<_binList.length;i++) { - int colID = _binList[i]; - - Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX); - TfUtils.checkValidInputFile(fs, path, true); - - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); - // format: colID,min,max,nbins - String[] fields = br.readLine().split(TXMTD_SEP); - double min = UtilFunctions.parseToDouble(fields[1]); - //double max = UtilFunctions.parseToDouble(fields[2]); - double binwidth = UtilFunctions.parseToDouble(fields[3]); - int nbins = UtilFunctions.parseToInt(fields[4]); - - _numBins[i] = nbins; - _min[i] = min; - _binWidths[i] = binwidth; // (max-min)/nbins; - - br.close(); - } - } - else { - fs.close(); - throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir); - } - } - - /** - * Method to apply transformations. - * - * @param words - * @return - */ - @Override - public String[] apply(String[] words, TfUtils agents) { - if ( _binList == null ) - return words; - - for(int i=0; i < _binList.length; i++) { - int colID = _binList[i]; - - try { - double val = UtilFunctions.parseToDouble(words[colID-1]); - int binid = 1; - double tmp = _min[i] + _binWidths[i]; - while(val > tmp && binid < _numBins[i]) { - tmp += _binWidths[i]; - binid++; - } - words[colID-1] = Integer.toString(binid); - } catch(NumberFormatException e) - { - throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method."); - } - } - - return words; - } - - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isBinned(int colID) - { - if(_binList == null) - return -1; - - int idx = Arrays.binarySearch(_binList, colID); - return ( idx >= 0 ? idx : -1); - } - - - @Override - public void print() { - System.out.print("Binning List (Equi-width): \n "); - for(int i : _binList) { - System.out.print(i + " "); - } - System.out.print("\n "); - for(int b : _numBins) { - System.out.print(b + " "); - } - System.out.println(); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.CharacterCodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.wink.json4j.JSONArray; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +import scala.Tuple2; + +import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class BinAgent extends TransformationAgent { + + private static final long serialVersionUID = 1917445005206076078L; + + public static final String MIN_PREFIX = "min"; + public static final String MAX_PREFIX = "max"; + public static final String NBINS_PREFIX = "nbins"; + + private int[] _binList = null; + //private byte[] _binMethodList = null; // Not used, since only equi-width is supported for now. + private int[] _numBins = null; + + private double[] _min=null, _max=null; // min and max among non-missing values + + private double[] _binWidths = null; // width of a bin for each attribute + + BinAgent() { } + + BinAgent(JSONObject parsedSpec) throws JSONException { + + if ( !parsedSpec.containsKey(TX_METHOD.BIN.toString()) ) + return; + + JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.BIN.toString()); + + JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); + //JSONArray mthds = (JSONArray) obj.get(JSON_MTHD); + JSONArray nbins = (JSONArray) obj.get(JSON_NBINS); + + assert(attrs.size() == nbins.size()); + + _binList = new int[attrs.size()]; + _numBins = new int[attrs.size()]; + for(int i=0; i < _binList.length; i++) { + _binList[i] = UtilFunctions.toInt(attrs.get(i)); + _numBins[i] = UtilFunctions.toInt(nbins.get(i)); + } + + // initialize internal transformation metadata + _min = new double[_binList.length]; + Arrays.fill(_min, Double.MAX_VALUE); + _max = new double[_binList.length]; + Arrays.fill(_max, -Double.MAX_VALUE); + + _binWidths = new double[_binList.length]; + } + + public void prepare(String[] words, TfUtils agents) { + if ( _binList == null ) + return; + + for(int i=0; i <_binList.length; i++) { + int colID = _binList[i]; + + String w = null; + double d = 0; + + // equi-width + w = UtilFunctions.unquote(words[colID-1].trim()); + if(!agents.isNA(w)) { + d = UtilFunctions.parseToDouble(w); + if(d < _min[i]) + _min[i] = d; + if(d > _max[i]) + _max[i] = d; + } + } + } + + private DistinctValue prepMinOutput(int idx) throws CharacterCodingException { + String s = MIN_PREFIX + Double.toString(_min[idx]); + return new DistinctValue(s, -1L); + } + + private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException { + String s = MAX_PREFIX + Double.toString(_max[idx]); + return new DistinctValue(s, -1L); + } + + private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException { + String s = NBINS_PREFIX + Double.toString(_numBins[idx]); + return new DistinctValue(s, -1L); + } + + /** + * Method to output transformation metadata from the mappers. + * This information is collected and merged by the reducers. + * + * @param out + * @throws IOException + */ + @Override + public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { + if ( _binList == null ) + return; + + try { + for(int i=0; i < _binList.length; i++) { + int colID = _binList[i]; + IntWritable iw = new IntWritable(-colID); + + out.collect(iw, prepMinOutput(i)); + out.collect(iw, prepMaxOutput(i)); + out.collect(iw, prepNBinsOutput(i)); + } + } catch(Exception e) { + throw new IOException(e); + } + } + + public ArrayList<Tuple2<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils agents) throws IOException { + if ( _binList == null ) + return list; + + try { + for(int i=0; i < _binList.length; i++) { + int colID = _binList[i]; + Integer iw = -colID; + + list.add( new Tuple2<Integer,DistinctValue>(iw, prepMinOutput(i)) ); + list.add( new Tuple2<Integer,DistinctValue>(iw, prepMaxOutput(i)) ); + list.add( new Tuple2<Integer,DistinctValue>(iw, prepNBinsOutput(i)) ); + } + } catch(Exception e) { + throw new IOException(e); + } + return list; + } + + private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException + { + Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + BIN_FILE_SUFFIX); + BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); + br.write(colID + TXMTD_SEP + min + TXMTD_SEP + max + TXMTD_SEP + binwidth + TXMTD_SEP + nbins + "\n"); + br.close(); + } + + /** + * Method to merge map output transformation metadata. + * + * @param values + * @return + * @throws IOException + */ + @Override + public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { + double min = Double.MAX_VALUE; + double max = -Double.MAX_VALUE; + int nbins = 0; + + DistinctValue val = new DistinctValue(); + String w = null; + double d; + while(values.hasNext()) { + val.reset(); + val = values.next(); + w = val.getWord(); + + if(w.startsWith(MIN_PREFIX)) { + d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() )); + if ( d < min ) + min = d; + } + else if(w.startsWith(MAX_PREFIX)) { + d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() )); + if ( d > max ) + max = d; + } + else if (w.startsWith(NBINS_PREFIX)) { + nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) ); + } + else + throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w); + } + + // write merged metadata + double binwidth = (max-min)/nbins; + writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents); + } + + + public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { + if(_binList == null) + return; + + MVImputeAgent mvagent = agents.getMVImputeAgent(); + for(int i=0; i < _binList.length; i++) { + int colID = _binList[i]; + + // If the column is imputed with a constant, then adjust min and max based the value of the constant. + if ( mvagent.isImputed(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) + { + double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) ); + if ( cst < _min[i]) + _min[i] = cst; + if ( cst > _max[i]) + _max[i] = cst; + } + + double binwidth = (_max[i] - _min[i])/_numBins[i]; + writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents); + } + } + + // ------------------------------------------------------------------------------------------------ + + public int[] getBinList() { return _binList; } + public int[] getNumBins() { return _numBins; } + public double[] getMin() { return _min; } + public double[] getBinWidths() { return _binWidths; } + + /** + * Method to load transform metadata for all attributes + * + * @param job + * @throws IOException + */ + @Override + public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { + if ( _binList == null ) + return; + + if(fs.isDirectory(txMtdDir)) { + for(int i=0; i<_binList.length;i++) { + int colID = _binList[i]; + + Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + BIN_FILE_SUFFIX); + TfUtils.checkValidInputFile(fs, path, true); + + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); + // format: colID,min,max,nbins + String[] fields = br.readLine().split(TXMTD_SEP); + double min = UtilFunctions.parseToDouble(fields[1]); + //double max = UtilFunctions.parseToDouble(fields[2]); + double binwidth = UtilFunctions.parseToDouble(fields[3]); + int nbins = UtilFunctions.parseToInt(fields[4]); + + _numBins[i] = nbins; + _min[i] = min; + _binWidths[i] = binwidth; // (max-min)/nbins; + + br.close(); + } + } + else { + fs.close(); + throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir); + } + } + + /** + * Method to apply transformations. + * + * @param words + * @return + */ + @Override + public String[] apply(String[] words, TfUtils agents) { + if ( _binList == null ) + return words; + + for(int i=0; i < _binList.length; i++) { + int colID = _binList[i]; + + try { + double val = UtilFunctions.parseToDouble(words[colID-1]); + int binid = 1; + double tmp = _min[i] + _binWidths[i]; + while(val > tmp && binid < _numBins[i]) { + tmp += _binWidths[i]; + binid++; + } + words[colID-1] = Integer.toString(binid); + } catch(NumberFormatException e) + { + throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method."); + } + } + + return words; + } + + /** + * Check if the given column ID is subjected to this transformation. + * + */ + public int isBinned(int colID) + { + if(_binList == null) + return -1; + + int idx = Arrays.binarySearch(_binList, colID); + return ( idx >= 0 ? idx : -1); + } + + + @Override + public void print() { + System.out.print("Binning List (Equi-width): \n "); + for(int i : _binList) { + System.out.print(i + " "); + } + System.out.print("\n "); + for(int b : _numBins) { + System.out.print(b + " "); + } + System.out.println(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java index d44a904..2e52657 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java @@ -1,108 +1,108 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; - -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.util.UtilFunctions; - -public class DistinctValue implements Writable, Serializable { - - private static final long serialVersionUID = -8236705946336974836L; - - private static final byte [] EMPTY_BYTES = new byte[0]; - - // word (distinct value) - private byte[] _bytes; - private int _length; - // count - private long _count; - - public DistinctValue() { - _bytes = EMPTY_BYTES; - _length = 0; - _count = -1; - } - - public DistinctValue(String w, long count) throws CharacterCodingException { - ByteBuffer bb = Text.encode(w, true); - _bytes = bb.array(); - _length = bb.limit(); - _count = count; - } - - public DistinctValue(OffsetCount oc) throws CharacterCodingException - { - this(oc.filename + "," + oc.fileOffset, oc.count); - } - - public void reset() { - _bytes = EMPTY_BYTES; - _length = 0; - _count = -1; - } - - public String getWord() { return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); } - public long getCount() { return _count; } - - @Override - public void write(DataOutput out) throws IOException { - // write word - WritableUtils.writeVInt(out, _length); - out.write(_bytes, 0, _length); - // write count - out.writeLong(_count); - } - - @Override - public void readFields(DataInput in) throws IOException { - // read word - int newLength = WritableUtils.readVInt(in); - _bytes = new byte[newLength]; - in.readFully(_bytes, 0, newLength); - _length = newLength; - if (_length != _bytes.length) - System.out.println("ERROR in DistinctValue.readFields()"); - // read count - _count = in.readLong(); - } - - public OffsetCount getOffsetCount() { - OffsetCount oc = new OffsetCount(); - String[] parts = getWord().split(","); - oc.filename = parts[0]; - oc.fileOffset = UtilFunctions.parseToLong(parts[1]); - oc.count = getCount(); - - return oc; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class DistinctValue implements Writable, Serializable { + + private static final long serialVersionUID = -8236705946336974836L; + + private static final byte [] EMPTY_BYTES = new byte[0]; + + // word (distinct value) + private byte[] _bytes; + private int _length; + // count + private long _count; + + public DistinctValue() { + _bytes = EMPTY_BYTES; + _length = 0; + _count = -1; + } + + public DistinctValue(String w, long count) throws CharacterCodingException { + ByteBuffer bb = Text.encode(w, true); + _bytes = bb.array(); + _length = bb.limit(); + _count = count; + } + + public DistinctValue(OffsetCount oc) throws CharacterCodingException + { + this(oc.filename + "," + oc.fileOffset, oc.count); + } + + public void reset() { + _bytes = EMPTY_BYTES; + _length = 0; + _count = -1; + } + + public String getWord() { return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); } + public long getCount() { return _count; } + + @Override + public void write(DataOutput out) throws IOException { + // write word + WritableUtils.writeVInt(out, _length); + out.write(_bytes, 0, _length); + // write count + out.writeLong(_count); + } + + @Override + public void readFields(DataInput in) throws IOException { + // read word + int newLength = WritableUtils.readVInt(in); + _bytes = new byte[newLength]; + in.readFully(_bytes, 0, newLength); + _length = newLength; + if (_length != _bytes.length) + System.out.println("ERROR in DistinctValue.readFields()"); + // read count + _count = in.readLong(); + } + + public OffsetCount getOffsetCount() { + OffsetCount oc = new OffsetCount(); + String[] parts = getWord().split(","); + oc.filename = parts[0]; + oc.fileOffset = UtilFunctions.parseToLong(parts[1]); + oc.count = getCount(); + + return oc; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java index a1c76ba..079ad58 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java @@ -1,426 +1,426 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.wink.json4j.JSONArray; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -import com.google.common.base.Functions; -import com.google.common.collect.Ordering; -import org.apache.sysml.runtime.util.UtilFunctions; - -public class DummycodeAgent extends TransformationAgent { - - private static final long serialVersionUID = 5832130477659116489L; - - private int[] _dcdList = null; - private long numCols = 0; - - private HashMap<Integer, HashMap<String,String>> _finalMaps = null; - private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null; - private int[] _binList = null; - private int[] _numBins = null; - - private int[] _domainSizes = null; // length = #of dummycoded columns - private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs - private long _dummycodedLength = 0; // #of columns after dummycoded - - DummycodeAgent(int[] list) { - _dcdList = list; - } - - DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException { - numCols = ncol; - - if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) ) - return; - - JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString()); - JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); - - _dcdList = new int[attrs.size()]; - for(int i=0; i < _dcdList.length; i++) - _dcdList[i] = UtilFunctions.toInt(attrs.get(i)); - } - - public int[] dcdList() { - return _dcdList; - } - - /** - * Method to output transformation metadata from the mappers. - * This information is collected and merged by the reducers. - * - * @param out - * @throws IOException - * - */ - @Override - public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - // There is no metadata required for dummycode. - // Required information is output from RecodeAgent. - return; - } - - @Override - public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, - String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { - // Nothing to do here - } - - public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) { - _finalMaps = maps; - } - - public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) { - _finalMapsCP = maps; - } - - public void setNumBins(int[] binList, int[] numbins) { - _binList = binList; - _numBins = numbins; - } - - /** - * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data. - * - * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end] - * 1/0 indicates if ColID is dummycoded or not - * [st,end] is the range of dummycoded column numbers for the given ColID - * - * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output. - * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type - * dummycoded, and the remaining are of type scale. - * - * @param fs - * @param txMtdDir - * @param numCols - * @param ra - * @param ba - * @return Number of columns in the transformed data - * @throws IOException - */ - public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException { - - // initialize all column types in the transformed data to SCALE - ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength]; - for(int i=0; i < _dummycodedLength; i++) - ctypes[i] = ColumnTypes.SCALE; - - _dcdColumnMap = new int[numCols]; - - Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - - int sum=1; - int idx = 0; - for(int colID=1; colID <= numCols; colID++) - { - if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID ) - { - br.write(colID + "," + "1" + "," + sum + "," + (sum+_domainSizes[idx]-1) + "\n"); - _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1; - - for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++) - ctypes[i-1] = ColumnTypes.DUMMYCODED; - - sum += _domainSizes[idx]; - idx++; - } - else - { - br.write(colID + "," + "0" + "," + sum + "," + sum + "\n"); - _dcdColumnMap[colID-1] = sum-1; - - if ( agents.getBinAgent().isBinned(colID) != -1 ) - ctypes[sum-1] = ColumnTypes.ORDINAL; // binned variable results in an ordinal column - - if ( agents.getRecodeAgent().isRecoded(colID) != -1 ) - ctypes[sum-1] = ColumnTypes.NOMINAL; - - sum += 1; - } - } - br.close(); - - // Write coltypes.csv - pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME); - br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - - br.write(columnTypeToID(ctypes[0]) + ""); - for(int i = 1; i < _dummycodedLength; i++) - br.write( "," + columnTypeToID(ctypes[i])); - br.close(); - - return sum-1; - } - - /** - * Given a dummycoded column id, find the corresponding original column ID. - * - * @param colID - * @return - */ - public int mapDcdColumnID(int colID) - { - for(int i=0; i < _dcdColumnMap.length; i++) - { - int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1); - int end = _dcdColumnMap[i]+1; - //System.out.println((i+1) + ": " + "[" + st + "," + end + "]"); - - if ( colID >= st && colID <= end) - return i+1; - } - return -1; - } - - public String constructDummycodedHeader(String header, Pattern delim) { - - if(_dcdList == null && _binList == null ) - // none of the columns are dummycoded, simply return the given header - return header; - - String[] names = delim.split(header, -1); - List<String> newNames = null; - - StringBuilder sb = new StringBuilder(); - - // Dummycoding can be performed on either on a recoded column or on a binned column - - // process recoded columns - if(_finalMapsCP != null && _dcdList != null) - { - for(int i=0; i <_dcdList.length; i++) - { - int colID = _dcdList[i]; - HashMap<String,Long> map = _finalMapsCP.get(colID); - String colName = UtilFunctions.unquote(names[colID-1]); - - if ( map != null ) - { - // order map entries by their recodeID - Ordering<String> valueComparator = Ordering.natural().onResultOf(Functions.forMap(map)); - newNames = valueComparator.sortedCopy(map.keySet()); - - // construct concatenated string of map entries - sb.setLength(0); - for(int idx=0; idx < newNames.size(); idx++) - { - if(idx==0) - sb.append( colName + DCD_NAME_SEP + newNames.get(idx)); - else - sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx)); - } - names[colID-1] = sb.toString(); // replace original column name with dcd name - } - } - } - else if(_finalMaps != null && _dcdList != null) { - for(int i=0; i <_dcdList.length; i++) { - int colID = _dcdList[i]; - HashMap<String,String> map = _finalMaps.get(colID); - String colName = UtilFunctions.unquote(names[colID-1]); - - if ( map != null ) - { - // order map entries by their recodeID (represented as Strings .. "1", "2", etc.) - Ordering<String> orderByID = new Ordering<String>() - { - public int compare(String s1, String s2) { - return (Integer.parseInt(s1) - Integer.parseInt(s2)); - } - }; - - newNames = orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet()); - // construct concatenated string of map entries - sb.setLength(0); - for(int idx=0; idx < newNames.size(); idx++) - { - if(idx==0) - sb.append( colName + DCD_NAME_SEP + newNames.get(idx)); - else - sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx)); - } - names[colID-1] = sb.toString(); // replace original column name with dcd name - } - } - } - - // process binned columns - if (_binList != null) - for(int i=0; i < _binList.length; i++) - { - int colID = _binList[i]; - - // need to consider only binned and dummycoded columns - if(isDummyCoded(colID) == -1) - continue; - - int numBins = _numBins[i]; - String colName = UtilFunctions.unquote(names[colID-1]); - - sb.setLength(0); - for(int idx=0; idx < numBins; idx++) - if(idx==0) - sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) ); - else - sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) ); - names[colID-1] = sb.toString(); // replace original column name with dcd name - } - - // Construct the full header - sb.setLength(0); - for(int colID=0; colID < names.length; colID++) - { - if (colID == 0) - sb.append(names[colID]); - else - sb.append(delim + names[colID]); - } - //System.out.println("DummycodedHeader: " + sb.toString()); - - return sb.toString(); - } - - @Override - public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if ( _dcdList == null ) - { - _dummycodedLength = numCols; - return; - } - - // sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function. - Arrays.sort(_dcdList); - _domainSizes = new int[_dcdList.length]; - - _dummycodedLength = numCols; - - //HashMap<String, String> map = null; - for(int i=0; i<_dcdList.length; i++) { - int colID = _dcdList[i]; - - // Find the domain size for colID using _finalMaps or _finalMapsCP - int domainSize = 0; - if(_finalMaps != null) { - if(_finalMaps.get(colID) != null) - domainSize = _finalMaps.get(colID).size(); - } - else { - if(_finalMapsCP.get(colID) != null) - domainSize = _finalMapsCP.get(colID).size(); - } - - if ( domainSize != 0 ) { - // dummycoded column - _domainSizes[i] = domainSize; - } - else { - // binned column - if ( _binList != null ) - for(int j=0; j<_binList.length; j++) { - if (colID == _binList[j]) { - _domainSizes[i] = _numBins[j]; - break; - } - } - } - _dummycodedLength += _domainSizes[i]-1; - //System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength); - } - } - - /** - * Method to apply transformations. - * - * @param words - * @return - */ - @Override - public String[] apply(String[] words, TfUtils agents) { - - if ( _dcdList == null ) - return words; - - String[] nwords = new String[(int)_dummycodedLength]; - - int rcdVal = 0; - - for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) { - if(idx < _dcdList.length && colID==_dcdList[idx]) { - // dummycoded columns - try { - rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1])); - nwords[ ncolID-1+rcdVal-1 ] = "1"; - ncolID += _domainSizes[idx]; - idx++; - } catch (Exception e) { - System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength); - throw new RuntimeException(e); - } - } - else { - nwords[ncolID-1] = words[colID-1]; - ncolID++; - } - } - - return nwords; - } - - /** - * Check if the given column ID is subjected to this transformation. - * - */ - public int isDummyCoded(int colID) - { - if(_dcdList == null) - return -1; - - int idx = Arrays.binarySearch(_dcdList, colID); - return ( idx >= 0 ? idx : -1); - } - - @Override - public void print() { - System.out.print("Dummycoding List: \n "); - for(int i : _dcdList) { - System.out.print(i + " "); - } - System.out.println(); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.wink.json4j.JSONArray; +import org.apache.wink.json4j.JSONException; +import org.apache.wink.json4j.JSONObject; + +import com.google.common.base.Functions; +import com.google.common.collect.Ordering; +import org.apache.sysml.runtime.util.UtilFunctions; + +public class DummycodeAgent extends TransformationAgent { + + private static final long serialVersionUID = 5832130477659116489L; + + private int[] _dcdList = null; + private long numCols = 0; + + private HashMap<Integer, HashMap<String,String>> _finalMaps = null; + private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null; + private int[] _binList = null; + private int[] _numBins = null; + + private int[] _domainSizes = null; // length = #of dummycoded columns + private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs + private long _dummycodedLength = 0; // #of columns after dummycoded + + DummycodeAgent(int[] list) { + _dcdList = list; + } + + DummycodeAgent(JSONObject parsedSpec, long ncol) throws JSONException { + numCols = ncol; + + if ( !parsedSpec.containsKey(TX_METHOD.DUMMYCODE.toString()) ) + return; + + JSONObject obj = (JSONObject) parsedSpec.get(TX_METHOD.DUMMYCODE.toString()); + JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS); + + _dcdList = new int[attrs.size()]; + for(int i=0; i < _dcdList.length; i++) + _dcdList[i] = UtilFunctions.toInt(attrs.get(i)); + } + + public int[] dcdList() { + return _dcdList; + } + + /** + * Method to output transformation metadata from the mappers. + * This information is collected and merged by the reducers. + * + * @param out + * @throws IOException + * + */ + @Override + public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { + // There is no metadata required for dummycode. + // Required information is output from RecodeAgent. + return; + } + + @Override + public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, + String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { + // Nothing to do here + } + + public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) { + _finalMaps = maps; + } + + public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) { + _finalMapsCP = maps; + } + + public void setNumBins(int[] binList, int[] numbins) { + _binList = binList; + _numBins = numbins; + } + + /** + * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data. + * + * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end] + * 1/0 indicates if ColID is dummycoded or not + * [st,end] is the range of dummycoded column numbers for the given ColID + * + * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output. + * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type + * dummycoded, and the remaining are of type scale. + * + * @param fs + * @param txMtdDir + * @param numCols + * @param ra + * @param ba + * @return Number of columns in the transformed data + * @throws IOException + */ + public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException { + + // initialize all column types in the transformed data to SCALE + ColumnTypes[] ctypes = new ColumnTypes[(int) _dummycodedLength]; + for(int i=0; i < _dummycodedLength; i++) + ctypes[i] = ColumnTypes.SCALE; + + _dcdColumnMap = new int[numCols]; + + Path pt=new Path(txMtdDir+"/Dummycode/" + DCD_FILE_NAME); + BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); + + int sum=1; + int idx = 0; + for(int colID=1; colID <= numCols; colID++) + { + if ( _dcdList != null && idx < _dcdList.length && _dcdList[idx] == colID ) + { + br.write(colID + "," + "1" + "," + sum + "," + (sum+_domainSizes[idx]-1) + "\n"); + _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1; + + for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++) + ctypes[i-1] = ColumnTypes.DUMMYCODED; + + sum += _domainSizes[idx]; + idx++; + } + else + { + br.write(colID + "," + "0" + "," + sum + "," + sum + "\n"); + _dcdColumnMap[colID-1] = sum-1; + + if ( agents.getBinAgent().isBinned(colID) != -1 ) + ctypes[sum-1] = ColumnTypes.ORDINAL; // binned variable results in an ordinal column + + if ( agents.getRecodeAgent().isRecoded(colID) != -1 ) + ctypes[sum-1] = ColumnTypes.NOMINAL; + + sum += 1; + } + } + br.close(); + + // Write coltypes.csv + pt=new Path(txMtdDir+"/" + COLTYPES_FILE_NAME); + br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); + + br.write(columnTypeToID(ctypes[0]) + ""); + for(int i = 1; i < _dummycodedLength; i++) + br.write( "," + columnTypeToID(ctypes[i])); + br.close(); + + return sum-1; + } + + /** + * Given a dummycoded column id, find the corresponding original column ID. + * + * @param colID + * @return + */ + public int mapDcdColumnID(int colID) + { + for(int i=0; i < _dcdColumnMap.length; i++) + { + int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1); + int end = _dcdColumnMap[i]+1; + //System.out.println((i+1) + ": " + "[" + st + "," + end + "]"); + + if ( colID >= st && colID <= end) + return i+1; + } + return -1; + } + + public String constructDummycodedHeader(String header, Pattern delim) { + + if(_dcdList == null && _binList == null ) + // none of the columns are dummycoded, simply return the given header + return header; + + String[] names = delim.split(header, -1); + List<String> newNames = null; + + StringBuilder sb = new StringBuilder(); + + // Dummycoding can be performed on either on a recoded column or on a binned column + + // process recoded columns + if(_finalMapsCP != null && _dcdList != null) + { + for(int i=0; i <_dcdList.length; i++) + { + int colID = _dcdList[i]; + HashMap<String,Long> map = _finalMapsCP.get(colID); + String colName = UtilFunctions.unquote(names[colID-1]); + + if ( map != null ) + { + // order map entries by their recodeID + Ordering<String> valueComparator = Ordering.natural().onResultOf(Functions.forMap(map)); + newNames = valueComparator.sortedCopy(map.keySet()); + + // construct concatenated string of map entries + sb.setLength(0); + for(int idx=0; idx < newNames.size(); idx++) + { + if(idx==0) + sb.append( colName + DCD_NAME_SEP + newNames.get(idx)); + else + sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx)); + } + names[colID-1] = sb.toString(); // replace original column name with dcd name + } + } + } + else if(_finalMaps != null && _dcdList != null) { + for(int i=0; i <_dcdList.length; i++) { + int colID = _dcdList[i]; + HashMap<String,String> map = _finalMaps.get(colID); + String colName = UtilFunctions.unquote(names[colID-1]); + + if ( map != null ) + { + // order map entries by their recodeID (represented as Strings .. "1", "2", etc.) + Ordering<String> orderByID = new Ordering<String>() + { + public int compare(String s1, String s2) { + return (Integer.parseInt(s1) - Integer.parseInt(s2)); + } + }; + + newNames = orderByID.onResultOf(Functions.forMap(map)).sortedCopy(map.keySet()); + // construct concatenated string of map entries + sb.setLength(0); + for(int idx=0; idx < newNames.size(); idx++) + { + if(idx==0) + sb.append( colName + DCD_NAME_SEP + newNames.get(idx)); + else + sb.append( delim + colName + DCD_NAME_SEP + newNames.get(idx)); + } + names[colID-1] = sb.toString(); // replace original column name with dcd name + } + } + } + + // process binned columns + if (_binList != null) + for(int i=0; i < _binList.length; i++) + { + int colID = _binList[i]; + + // need to consider only binned and dummycoded columns + if(isDummyCoded(colID) == -1) + continue; + + int numBins = _numBins[i]; + String colName = UtilFunctions.unquote(names[colID-1]); + + sb.setLength(0); + for(int idx=0; idx < numBins; idx++) + if(idx==0) + sb.append( colName + DCD_NAME_SEP + "Bin" + (idx+1) ); + else + sb.append( delim + colName + DCD_NAME_SEP + "Bin" + (idx+1) ); + names[colID-1] = sb.toString(); // replace original column name with dcd name + } + + // Construct the full header + sb.setLength(0); + for(int colID=0; colID < names.length; colID++) + { + if (colID == 0) + sb.append(names[colID]); + else + sb.append(delim + names[colID]); + } + //System.out.println("DummycodedHeader: " + sb.toString()); + + return sb.toString(); + } + + @Override + public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { + if ( _dcdList == null ) + { + _dummycodedLength = numCols; + return; + } + + // sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function. + Arrays.sort(_dcdList); + _domainSizes = new int[_dcdList.length]; + + _dummycodedLength = numCols; + + //HashMap<String, String> map = null; + for(int i=0; i<_dcdList.length; i++) { + int colID = _dcdList[i]; + + // Find the domain size for colID using _finalMaps or _finalMapsCP + int domainSize = 0; + if(_finalMaps != null) { + if(_finalMaps.get(colID) != null) + domainSize = _finalMaps.get(colID).size(); + } + else { + if(_finalMapsCP.get(colID) != null) + domainSize = _finalMapsCP.get(colID).size(); + } + + if ( domainSize != 0 ) { + // dummycoded column + _domainSizes[i] = domainSize; + } + else { + // binned column + if ( _binList != null ) + for(int j=0; j<_binList.length; j++) { + if (colID == _binList[j]) { + _domainSizes[i] = _numBins[j]; + break; + } + } + } + _dummycodedLength += _domainSizes[i]-1; + //System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength); + } + } + + /** + * Method to apply transformations. + * + * @param words + * @return + */ + @Override + public String[] apply(String[] words, TfUtils agents) { + + if ( _dcdList == null ) + return words; + + String[] nwords = new String[(int)_dummycodedLength]; + + int rcdVal = 0; + + for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) { + if(idx < _dcdList.length && colID==_dcdList[idx]) { + // dummycoded columns + try { + rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1])); + nwords[ ncolID-1+rcdVal-1 ] = "1"; + ncolID += _domainSizes[idx]; + idx++; + } catch (Exception e) { + System.out.println("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength); + throw new RuntimeException(e); + } + } + else { + nwords[ncolID-1] = words[colID-1]; + ncolID++; + } + } + + return nwords; + } + + /** + * Check if the given column ID is subjected to this transformation. + * + */ + public int isDummyCoded(int colID) + { + if(_dcdList == null) + return -1; + + int idx = Arrays.binarySearch(_dcdList, colID); + return ( idx >= 0 ? idx : -1); + } + + @Override + public void print() { + System.out.print("Dummycoding List: \n "); + for(int i : _dcdList) { + System.out.print(i + " "); + } + System.out.println(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/05d2c0a8/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java index e254403..4e3ece5 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java @@ -1,107 +1,107 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; - -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; - - -public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{ - - private OutputCollector<IntWritable, DistinctValue> _collector = null; - private int _mapTaskID = -1; - - TfUtils _agents = null; - - private boolean _partFileWithHeader = false; - private boolean _firstRecordInSplit = true; - private String _partFileName = null; - private long _offsetInPartFile = -1; - - // ---------------------------------------------------------------------------------------------- - - /** - * Configure the information used in the mapper, and setup transformation agents. - */ - @Override - public void configure(JobConf job) { - String[] parts = job.get("mapred.task.id").split("_"); - if ( parts[0].equalsIgnoreCase("task")) { - _mapTaskID = Integer.parseInt(parts[parts.length-1]); - } - else if ( parts[0].equalsIgnoreCase("attempt")) { - _mapTaskID = Integer.parseInt(parts[parts.length-2]); - } - else { - throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id")); - } - - try { - _partFileName = TfUtils.getPartFileName(job); - _partFileWithHeader = TfUtils.isPartFileWithHeader(job); - _agents = new TfUtils(job); - } catch(IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - - } - - - public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException { - - if(_firstRecordInSplit) - { - _firstRecordInSplit = false; - _collector = out; - _offsetInPartFile = rawKey.get(); - } - - // ignore header - if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader) - return; - - _agents.prepareTfMtd(rawValue.toString()); - } - - @Override - public void close() throws IOException { - _agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); - _agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); - _agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); - - // Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking. - // OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile. - _collector.collect(new IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid()))); - - // reset global variables, required when the jvm is reused. - _firstRecordInSplit = true; - _offsetInPartFile = -1; - _partFileWithHeader = false; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform; +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.wink.json4j.JSONException; + +import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; + + +public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{ + + private OutputCollector<IntWritable, DistinctValue> _collector = null; + private int _mapTaskID = -1; + + TfUtils _agents = null; + + private boolean _partFileWithHeader = false; + private boolean _firstRecordInSplit = true; + private String _partFileName = null; + private long _offsetInPartFile = -1; + + // ---------------------------------------------------------------------------------------------- + + /** + * Configure the information used in the mapper, and setup transformation agents. + */ + @Override + public void configure(JobConf job) { + String[] parts = job.get("mapred.task.id").split("_"); + if ( parts[0].equalsIgnoreCase("task")) { + _mapTaskID = Integer.parseInt(parts[parts.length-1]); + } + else if ( parts[0].equalsIgnoreCase("attempt")) { + _mapTaskID = Integer.parseInt(parts[parts.length-2]); + } + else { + throw new RuntimeException("Unrecognized format for taskID: " + job.get("mapred.task.id")); + } + + try { + _partFileName = TfUtils.getPartFileName(job); + _partFileWithHeader = TfUtils.isPartFileWithHeader(job); + _agents = new TfUtils(job); + } catch(IOException e) { throw new RuntimeException(e); } + catch(JSONException e) { throw new RuntimeException(e); } + + } + + + public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException { + + if(_firstRecordInSplit) + { + _firstRecordInSplit = false; + _collector = out; + _offsetInPartFile = rawKey.get(); + } + + // ignore header + if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader) + return; + + _agents.prepareTfMtd(rawValue.toString()); + } + + @Override + public void close() throws IOException { + _agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); + _agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); + _agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); + + // Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking. + // OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile. + _collector.collect(new IntWritable((int)_agents.getNumCols()+1), new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid()))); + + // reset global variables, required when the jvm is reused. + _firstRecordInSplit = true; + _offsetInPartFile = -1; + _partFileWithHeader = false; + } + +}
