http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java deleted file mode 100644 index 77c06ae..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; -import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes; -import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper; -import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; -import org.apache.sysml.runtime.matrix.mapred.CSVReblockMapper.IndexedBlockRow; -import org.apache.sysml.runtime.matrix.mapred.MapperBase; - -@SuppressWarnings("deprecation") -public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, Text, TaggedFirstSecondIndexes, CSVReblockMR.BlockRow>{ - - boolean _partFileWithHeader = false; - TfUtils tfmapper = null; - Reporter _reporter = null; - - // variables relevant to CSV Reblock - private IndexedBlockRow idxRow = null; - private long rowOffset=0; - private HashMap<Long, Long> offsetMap=new HashMap<Long, Long>(); - private boolean _first = true; - private long num=0; - - @Override - public void configure(JobConf job) { - super.configure(job); - try { - _partFileWithHeader = TfUtils.isPartFileWithHeader(job); - tfmapper = new TfUtils(job); - tfmapper.loadTfMetadata(job, true); - - // Load relevant information for CSV Reblock - ByteWritable key=new ByteWritable(); - OffsetCount value=new OffsetCount(); - Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME)); - - Path path=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - String thisfile=path.makeQualified(fs).toString(); - - SequenceFile.Reader reader = null; - try { - reader = new SequenceFile.Reader(fs, p, job); - while (reader.next(key, value)) { - // "key" needn't be checked since the offset file has information about a single CSV input (the raw data file) - if(thisfile.equals(value.filename)) - offsetMap.put(value.fileOffset, value.count); - } - } - finally { - IOUtilFunctions.closeSilently(reader); - } - - idxRow = new CSVReblockMapper.IndexedBlockRow(); - int maxBclen=0; - - for(ArrayList<CSVReblockInstruction> insv: csv_reblock_instructions) - for(CSVReblockInstruction in: insv) - { - if(maxBclen<in.bclen) - maxBclen=in.bclen; - } - - //always dense since common csv usecase - idxRow.getRow().data.reset(1, maxBclen, false); - - } catch (IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - - } - - @Override - public void map(LongWritable rawKey, Text rawValue, OutputCollector<TaggedFirstSecondIndexes,CSVReblockMR.BlockRow> out, Reporter reporter) throws IOException { - - if(_first) { - rowOffset=offsetMap.get(rawKey.get()); - _reporter = reporter; - _first=false; - } - - // output the header line - if ( rawKey.get() == 0 && _partFileWithHeader ) - { - tfmapper.processHeaderLine(); - if ( tfmapper.hasHeader() ) - return; - } - - // parse the input line and apply transformation - String[] words = tfmapper.getWords(rawValue); - - if(!tfmapper.omit(words)) - { - words = tfmapper.apply(words); - try { - tfmapper.check(words); - - // Perform CSV Reblock - CSVReblockInstruction ins = csv_reblock_instructions.get(0).get(0); - idxRow = CSVReblockMapper.processRow(idxRow, words, rowOffset, num, ins.output, ins.brlen, ins.bclen, ins.fill, ins.fillValue, out); - } - catch(DMLRuntimeException e) { - throw new RuntimeException(e.getMessage() + ":" + rawValue.toString()); - } - num++; - } - } - - @Override - public void close() throws IOException { - } - - @Override - protected void specialOperationsForActualMap(int index, - OutputCollector<Writable, Writable> out, Reporter reporter) - throws IOException { - } - -}
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java deleted file mode 100644 index e2885d8..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.FileNotFoundException; -import java.io.IOException; - -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.JobReturn; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; - - -@SuppressWarnings("deprecation") -public class ApplyTfCSVMR { - - public static JobReturn runJob(String inputPath, String spec, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numCols, int replication, String headerLine) throws IOException, ClassNotFoundException, InterruptedException { - JobConf job = new JobConf(ApplyTfCSVMR.class); - job.setJobName("ApplyTfCSV"); - - /* Setup MapReduce Job */ - job.setJarByClass(ApplyTfCSVMR.class); - - // set relevant classes - job.setMapperClass(ApplyTfCSVMapper.class); - job.setNumReduceTasks(0); - - // Add transformation metadata file as well as partOffsetsFile to Distributed cache - DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job); - DistributedCache.createSymlink(job); - - Path cachefile=new Path(partOffsetsFile); - DistributedCache.addCacheFile(cachefile.toUri(), job); - DistributedCache.createSymlink(job); - - // set input and output properties - job.setInputFormat(TextInputFormat.class); - job.setOutputFormat(TextOutputFormat.class); - - job.setMapOutputKeyClass(NullWritable.class); - job.setMapOutputValueClass(Text.class); - - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(Text.class); - - job.setInt(MRConfigurationNames.DFS_REPLICATION, replication); - - FileInputFormat.addInputPath(job, new Path(inputPath)); - // delete outputPath, if exists already. - Path outPath = new Path(outputPath); - FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job); - fs.delete(outPath, true); - FileOutputFormat.setOutputPath(job, outPath); - - job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader())); - job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim()); - if ( inputDataProperties.getNAStrings() != null) - // Adding "dummy" string to handle the case of na_strings = "" - job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) ); - job.set(MRJobConfiguration.TF_SPEC, spec); - job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath)); - job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath); - job.setLong(MRJobConfiguration.TF_NUM_COLS, numCols); - job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath); - job.set(MRJobConfiguration.TF_HEADER, headerLine); - job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString()); - job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath); - - //turn off adaptivemr - job.setBoolean("adaptivemr.map.enable", false); - - // Run the job - RunningJob runjob = JobClient.runJob(job); - - // Since transform CSV produces part files w/ prefix transform-part-*, - // delete all the "default" part-..... files - deletePartFiles(fs, outPath); - - MatrixCharacteristics mc = new MatrixCharacteristics(); - return new JobReturn(new MatrixCharacteristics[]{mc}, runjob.isSuccessful()); - } - - private static void deletePartFiles(FileSystem fs, Path path) throws FileNotFoundException, IOException - { - PathFilter filter=new PathFilter(){ - public boolean accept(Path file) { - return file.getName().startsWith("part-"); - } - }; - FileStatus[] list = fs.listStatus(path, filter); - for(FileStatus stat : list) { - fs.delete(stat.getPath(), false); - } - } - -} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java deleted file mode 100644 index 05b8a19..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.io.IOUtilFunctions; - -public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> { - - boolean _firstRecordInSplit = true; - boolean _partFileWithHeader = false; - - TfUtils tfmapper = null; - Reporter _reporter = null; - BufferedWriter br = null; - JobConf _rJob = null; - - @Override - public void configure(JobConf job) { - try { - _rJob = job; - _partFileWithHeader = TfUtils.isPartFileWithHeader(job); - tfmapper = new TfUtils(job); - - tfmapper.loadTfMetadata(job, true); - - } catch (IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - - } - - @Override - public void map(LongWritable rawKey, Text rawValue, OutputCollector<NullWritable, Text> out, Reporter reporter) throws IOException { - - if(_firstRecordInSplit) - { - _firstRecordInSplit = false; - _reporter = reporter; - - // generate custom output paths so that order of rows in the - // output (across part files) matches w/ that from input data set - String partFileSuffix = tfmapper.getPartFileID(_rJob, rawKey.get()); - Path mapOutputPath = new Path(tfmapper.getOutputPath() + "/transform-part-" + partFileSuffix); - - // setup the writer for mapper's output - // the default part-..... files will be deleted later once the job finishes - FileSystem fs = IOUtilFunctions.getFileSystem(mapOutputPath); - br = new BufferedWriter(new OutputStreamWriter(fs.create( mapOutputPath, true))); - } - - // output the header line - if ( rawKey.get() == 0 && _partFileWithHeader ) - { - _reporter = reporter; - tfmapper.processHeaderLine(); - if ( tfmapper.hasHeader() ) - return; - } - - // parse the input line and apply transformation - String[] words = tfmapper.getWords(rawValue); - - if(!tfmapper.omit(words)) - { - try { - words = tfmapper.apply(words); - String outStr = tfmapper.checkAndPrepOutputString(words); - //out.collect(NullWritable.get(), new Text(outStr)); - br.write(outStr + "\n"); - } - catch(DMLRuntimeException e) { - throw new RuntimeException(e.getMessage() + ": " + rawValue.toString()); - } - } - } - - @Override - public void close() throws IOException { - IOUtilFunctions.closeSilently(br); - } - -} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java deleted file mode 100644 index b820449..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVSPARK.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.broadcast.Broadcast; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; - - -public class ApplyTfCSVSPARK { - - /** - * Apply transformation metadata and generate the result in CSV format, as a - * JavaRDD of Strings. - * - * @param sec spark execution context - * @param inputRDD input rdd - * @param tfMtdPath transform metadata path - * @param spec transform specification as json string - * @param tmpPath temporary file path - * @param prop csv file format properties - * @param numCols number of columns - * @param headerLine header line - * @return JavaPairRDD of long-strings - * @throws IOException if IOException occurs - * @throws ClassNotFoundException if ClassNotFoundException occurs - * @throws InterruptedException if InterruptedException occurs - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws JSONException if JSONException occurs - */ - public static JavaPairRDD<Long, String> runSparkJob( - SparkExecutionContext sec, JavaRDD<Tuple2<LongWritable, Text>> inputRDD, - String tfMtdPath, String spec, String tmpPath, CSVFileFormatProperties prop, - int numCols, String headerLine) - throws IOException, ClassNotFoundException, InterruptedException, IllegalArgumentException, JSONException - { - // Load transformation metadata and broadcast it - String[] naStrings = TfUtils.parseNAStrings(prop.getNAStrings()); - JSONObject jspec = new JSONObject(spec); - TfUtils _tfmapper = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), naStrings, jspec, numCols, tfMtdPath, null, tmpPath); - - _tfmapper.loadTfMetadata(); - - Broadcast<TfUtils> bcast_tf = sec.getSparkContext().broadcast(_tfmapper); - - /* - * Construct transformation metadata (map-side) -- the logic is similar - * to GTFMTDMapper - * - * Note: The result of mapPartitionsWithIndex is cached so that the - * transformed data is not redundantly computed multiple times - */ - JavaPairRDD<Long, String> applyRDD = inputRDD - .mapPartitionsWithIndex( new ApplyTfCSVMap(bcast_tf), true) - .mapToPair( - new PairFunction<String,Long,String>(){ - private static final long serialVersionUID = 3868143093999082931L; - @Override - public Tuple2<Long, String> call(String t) throws Exception { - return new Tuple2<Long, String>(new Long(1), t); - } - } - ).cache(); - - /* - * An action to force execution of apply() - * - * We need to trigger the execution of this RDD so as to ensure the - * creation of a few metadata files (headers, dummycoded information, - * etc.), which are referenced in the caller function. - */ - applyRDD.count(); - - return applyRDD; - } - - public static class ApplyTfCSVMap implements Function2<Integer, Iterator<Tuple2<LongWritable, Text>>, Iterator<String>> { - - private static final long serialVersionUID = 1496686437276906911L; - - TfUtils _tfmapper = null; - - ApplyTfCSVMap(Broadcast<TfUtils> tf) throws IllegalArgumentException, IOException, JSONException { - _tfmapper = tf.getValue(); - } - - @Override - public Iterator<String> call(Integer partitionID, - Iterator<Tuple2<LongWritable, Text>> csvLines) throws Exception { - - boolean first = true; - Tuple2<LongWritable, Text> rec = null; - ArrayList<String> outLines = new ArrayList<String>(); - - while(csvLines.hasNext()) { - rec = csvLines.next(); - - if (first && partitionID == 0) { - first = false; - - _tfmapper.processHeaderLine(); - - if (_tfmapper.hasHeader() ) { - continue; - } - } - - // parse the input line and apply transformation - - String[] words = _tfmapper.getWords(rec._2()); - - if(!_tfmapper.omit(words)) - { - try { - words = _tfmapper.apply(words); - String outStr = _tfmapper.checkAndPrepOutputString(words); - outLines.add(outStr); - } - catch(DMLRuntimeException e) { - throw new RuntimeException(e.getMessage() + ": " + rec._2().toString()); - } - } - } - - return outLines.iterator(); - } - - } - - -} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java deleted file mode 100644 index 8878ff0..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.nio.charset.CharacterCodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.wink.json4j.JSONArray; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; -import org.apache.sysml.lops.Lop; -import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.matrix.data.FrameBlock; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.Pair; -import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; -import org.apache.sysml.runtime.transform.encode.Encoder; -import org.apache.sysml.runtime.transform.meta.TfMetaUtils; -import org.apache.sysml.runtime.util.UtilFunctions; - -public class BinAgent extends Encoder -{ - private static final long serialVersionUID = 1917445005206076078L; - - public static final String MIN_PREFIX = "min"; - public static final String MAX_PREFIX = "max"; - public static final String NBINS_PREFIX = "nbins"; - - private int[] _numBins = null; - private double[] _min=null, _max=null; // min and max among non-missing values - private double[] _binWidths = null; // width of a bin for each attribute - - //frame transform-apply attributes - private double[][] _binMins = null; - private double[][] _binMaxs = null; - - public BinAgent(JSONObject parsedSpec, String[] colnames, int clen) - throws JSONException, IOException - { - this(parsedSpec, colnames, clen, false); - } - - public BinAgent(JSONObject parsedSpec, String[] colnames, int clen, boolean colsOnly) - throws JSONException, IOException - { - super( null, clen ); - if ( !parsedSpec.containsKey(TfUtils.TXMETHOD_BIN) ) - return; - - if( colsOnly ) { - List<Integer> collist = TfMetaUtils.parseBinningColIDs(parsedSpec, colnames); - initColList(ArrayUtils.toPrimitive(collist.toArray(new Integer[0]))); - } - else - { - JSONObject obj = (JSONObject) parsedSpec.get(TfUtils.TXMETHOD_BIN); - JSONArray attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - JSONArray nbins = (JSONArray) obj.get(TfUtils.JSON_NBINS); - initColList(attrs); - - _numBins = new int[attrs.size()]; - for(int i=0; i < _numBins.length; i++) - _numBins[i] = UtilFunctions.toInt(nbins.get(i)); - - // initialize internal transformation metadata - _min = new double[_colList.length]; - Arrays.fill(_min, Double.MAX_VALUE); - _max = new double[_colList.length]; - Arrays.fill(_max, -Double.MAX_VALUE); - - _binWidths = new double[_colList.length]; - } - } - - public int[] getNumBins() { return _numBins; } - public double[] getMin() { return _min; } - public double[] getBinWidths() { return _binWidths; } - - public void prepare(String[] words, TfUtils agents) { - if ( !isApplicable() ) - return; - - for(int i=0; i <_colList.length; i++) { - int colID = _colList[i]; - - String w = null; - double d = 0; - - // equi-width - w = UtilFunctions.unquote(words[colID-1].trim()); - if(!TfUtils.isNA(agents.getNAStrings(),w)) { - d = UtilFunctions.parseToDouble(w); - if(d < _min[i]) - _min[i] = d; - if(d > _max[i]) - _max[i] = d; - } - } - } - - private DistinctValue prepMinOutput(int idx) throws CharacterCodingException { - String s = MIN_PREFIX + Double.toString(_min[idx]); - return new DistinctValue(s, -1L); - } - - private DistinctValue prepMaxOutput(int idx) throws CharacterCodingException { - String s = MAX_PREFIX + Double.toString(_max[idx]); - return new DistinctValue(s, -1L); - } - - private DistinctValue prepNBinsOutput(int idx) throws CharacterCodingException { - String s = NBINS_PREFIX + Double.toString(_numBins[idx]); - return new DistinctValue(s, -1L); - } - - /** - * Method to output transformation metadata from the mappers. - * This information is collected and merged by the reducers. - */ - @Override - public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - if( !isApplicable() ) - return; - - try { - for(int i=0; i < _colList.length; i++) { - int colID = _colList[i]; - IntWritable iw = new IntWritable(-colID); - - out.collect(iw, prepMinOutput(i)); - out.collect(iw, prepMaxOutput(i)); - out.collect(iw, prepNBinsOutput(i)); - } - } catch(Exception e) { - throw new IOException(e); - } - } - - public ArrayList<Pair<Integer, DistinctValue>> mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils agents) throws IOException { - if ( !isApplicable() ) - return list; - - try { - for(int i=0; i < _colList.length; i++) { - int colID = _colList[i]; - Integer iw = -colID; - - list.add( new Pair<Integer,DistinctValue>(iw, prepMinOutput(i)) ); - list.add( new Pair<Integer,DistinctValue>(iw, prepMaxOutput(i)) ); - list.add( new Pair<Integer,DistinctValue>(iw, prepNBinsOutput(i)) ); - } - } catch(Exception e) { - throw new IOException(e); - } - return list; - } - - private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException - { - Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); - BufferedWriter br = null; - try { - br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); - } - finally { - IOUtilFunctions.closeSilently(br); - } - } - - /** - * Method to merge map output transformation metadata. - */ - @Override - public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { - double min = Double.MAX_VALUE; - double max = -Double.MAX_VALUE; - int nbins = 0; - - DistinctValue val = new DistinctValue(); - String w = null; - double d; - while(values.hasNext()) { - val.reset(); - val = values.next(); - w = val.getWord(); - - if(w.startsWith(MIN_PREFIX)) { - d = UtilFunctions.parseToDouble(w.substring( MIN_PREFIX.length() )); - if ( d < min ) - min = d; - } - else if(w.startsWith(MAX_PREFIX)) { - d = UtilFunctions.parseToDouble(w.substring( MAX_PREFIX.length() )); - if ( d > max ) - max = d; - } - else if (w.startsWith(NBINS_PREFIX)) { - nbins = (int) UtilFunctions.parseToLong( w.substring(NBINS_PREFIX.length() ) ); - } - else - throw new RuntimeException("MVImputeAgent: Invalid prefix while merging map output: " + w); - } - - // write merged metadata - double binwidth = (max-min)/nbins; - writeTfMtd(colID, Double.toString(min), Double.toString(max), Double.toString(binwidth), Integer.toString(nbins), outputDir, fs, agents); - } - - - public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { - if( !isApplicable() ) - return; - - MVImputeAgent mvagent = agents.getMVImputeAgent(); - for(int i=0; i < _colList.length; i++) { - int colID = _colList[i]; - - // If the column is imputed with a constant, then adjust min and max based the value of the constant. - if ( mvagent.isApplicable(colID) != -1 && mvagent.getMethod(colID) == MVMethod.CONSTANT ) - { - double cst = UtilFunctions.parseToDouble( mvagent.getReplacement(colID) ); - if ( cst < _min[i]) - _min[i] = cst; - if ( cst > _max[i]) - _max[i] = cst; - } - - double binwidth = (_max[i] - _min[i])/_numBins[i]; - writeTfMtd(colID, Double.toString(_min[i]), Double.toString(_max[i]), Double.toString(binwidth), Integer.toString(_numBins[i]), outputDir, fs, agents); - } - } - - // ------------------------------------------------------------------------------------------------ - - /** - * Method to load transform metadata for all attributes - */ - @Override - public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if( !isApplicable() ) - return; - - if(fs.isDirectory(txMtdDir)) { - for(int i=0; i<_colList.length;i++) { - int colID = _colList[i]; - - Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); - TfUtils.checkValidInputFile(fs, path, true); - - BufferedReader br = null; - try { - br = new BufferedReader(new InputStreamReader(fs.open(path))); - // format: colID,min,max,nbins - String[] fields = br.readLine().split(TfUtils.TXMTD_SEP); - double min = UtilFunctions.parseToDouble(fields[1]); - //double max = UtilFunctions.parseToDouble(fields[2]); - double binwidth = UtilFunctions.parseToDouble(fields[3]); - int nbins = UtilFunctions.parseToInt(fields[4]); - - _numBins[i] = nbins; - _min[i] = min; - _binWidths[i] = binwidth; // (max-min)/nbins; - } - finally { - IOUtilFunctions.closeSilently(br); - } - } - } - else { - throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir); - } - } - - - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - build(in); - return apply(in, out); - } - - @Override - public void build(FrameBlock in) { - // TODO Auto-generated method stub - } - - /** - * Method to apply transformations. - */ - @Override - public String[] apply(String[] words) { - if( !isApplicable() ) - return words; - - for(int i=0; i < _colList.length; i++) { - int colID = _colList[i]; - try { - double val = UtilFunctions.parseToDouble(words[colID-1]); - int binid = 1; - double tmp = _min[i] + _binWidths[i]; - while(val > tmp && binid < _numBins[i]) { - tmp += _binWidths[i]; - binid++; - } - words[colID-1] = Integer.toString(binid); - } - catch(NumberFormatException e) { - throw new RuntimeException("Encountered \"" + words[colID-1] + "\" in column ID \"" + colID + "\", when expecting a numeric value. Consider adding \"" + words[colID-1] + "\" to na.strings, along with an appropriate imputation method."); - } - } - - return words; - } - - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out) { - for(int j=0; j<_colList.length; j++) { - int colID = _colList[j]; - for( int i=0; i<in.getNumRows(); i++ ) { - double inVal = UtilFunctions.objectToDouble( - in.getSchema()[colID-1], in.get(i, colID-1)); - int ix = Arrays.binarySearch(_binMaxs[j], inVal); - int binID = ((ix < 0) ? Math.abs(ix+1) : ix) + 1; - out.quickSetValue(i, colID-1, binID); - } - } - return out; - } - - @Override - public FrameBlock getMetaData(FrameBlock meta) { - return meta; - } - - @Override - public void initMetaData(FrameBlock meta) { - _binMins = new double[_colList.length][]; - _binMaxs = new double[_colList.length][]; - for( int j=0; j<_colList.length; j++ ) { - int colID = _colList[j]; //1-based - int nbins = (int)meta.getColumnMetadata()[colID-1].getNumDistinct(); - _binMins[j] = new double[nbins]; - _binMaxs[j] = new double[nbins]; - for( int i=0; i<nbins; i++ ) { - String[] tmp = meta.get(i, colID-1).toString().split(Lop.DATATYPE_PREFIX); - _binMins[j][i] = Double.parseDouble(tmp[0]); - _binMaxs[j][i] = Double.parseDouble(tmp[1]); - } - } - } -}