http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java deleted file mode 100644 index 4a7839d..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java +++ /dev/null @@ -1,1496 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.wink.json4j.JSONArray; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -import scala.Tuple2; - -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.conf.DMLConfig; -import org.apache.sysml.lops.CSVReBlock; -import org.apache.sysml.lops.Lop; -import org.apache.sysml.lops.LopProperties.ExecType; -import org.apache.sysml.parser.Expression.DataType; -import org.apache.sysml.parser.Expression.ValueType; -import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.caching.CacheableData; -import org.apache.sysml.runtime.controlprogram.caching.FrameObject; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.instructions.Instruction; -import org.apache.sysml.runtime.instructions.InstructionParser; -import org.apache.sysml.runtime.instructions.MRJobInstruction; -import org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction; -import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; -import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction; -import org.apache.sysml.runtime.instructions.spark.data.RDDObject; -import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; -import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.CSVReblockMR.AssignRowIDMRReturn; -import org.apache.sysml.runtime.matrix.JobReturn; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FileFormatProperties; -import org.apache.sysml.runtime.matrix.data.InputInfo; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.OutputInfo; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; -import org.apache.sysml.runtime.util.MapReduceTool; -import org.apache.sysml.runtime.util.UtilFunctions; -import org.apache.sysml.utils.JSONHelper; - -public class DataTransform -{ - private static final String ERROR_MSG_ZERO_ROWS = "Number of rows in the transformed output (potentially, after ommitting the ones with missing values) is zero. Cannot proceed."; - - - /** - * Method to read the header line from the input data file. - * - * @param fs file system - * @param prop csv file format properties - * @param smallestFile file name - * @return header line - * @throws IOException if IOException occurs - */ - private static String readHeaderLine(FileSystem fs, CSVFileFormatProperties prop, String smallestFile) throws IOException { - String line = null; - try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(smallestFile)))) ) { - line = br.readLine(); - } - if(prop.hasHeader()) { - ; // nothing here - } - else - { - // construct header with default column names, V1, V2, etc. - int ncol = Pattern.compile( Pattern.quote(prop.getDelim()) ).split(line, -1).length; - line = null; - - StringBuilder sb = new StringBuilder(); - sb.append("V1"); - for(int i=2; i <= ncol; i++) - sb.append(prop.getDelim() + "V" + i); - line = sb.toString(); - } - return line; - } - - /** - * Method to construct a mapping between column names and their - * corresponding column IDs. The mapping is used to prepare the - * specification file in <code>processSpecFile()</code>. - * - * @param fs file system - * @param prop csv file format properties - * @param headerLine header line - * @param smallestFile file name - * @return map of column names and their column IDs - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws IOException if IOException occurs - */ - private static HashMap<String, Integer> processColumnNames(FileSystem fs, CSVFileFormatProperties prop, String headerLine, String smallestFile) throws IllegalArgumentException, IOException { - HashMap<String, Integer> colNames = new HashMap<String,Integer>(); - - String escapedDelim = Pattern.quote(prop.getDelim()); - Pattern compiledDelim = Pattern.compile(escapedDelim); - String[] names = compiledDelim.split(headerLine, -1); - - for(int i=0; i< names.length; i++) - colNames.put(UtilFunctions.unquote(names[i].trim()), i+1); - - return colNames; - } - - /** - * In-place permutation of list, mthd, and cst arrays based on indices, - * by navigating through cycles in the permutation. - * - * @param list ? - * @param mthd ? - * @param cst ? - * @param indices ? - */ - private static void inplacePermute(int[] list, byte[] mthd, Object[] cst, Integer[] indices) - { - int x; - byte xb = 0; - Object xo = null; - - int j, k; - for(int i=0; i < list.length; i++) - { - x = list[i]; - xb = mthd[i]; - if ( cst != null ) xo = cst[i]; - - j = i; - while(true) { - k = indices[j]; - indices[j] = j; - - if (k == i) - break; - - list[j] = list[k]; - mthd[j] = mthd[k]; - if ( cst != null ) cst[j] = cst[k]; - j = k; - } - list[j] = x; - mthd[j] = xb; - if ( cst != null ) cst[j] = xo; - } - - } - - /** - * Convert input transformation specification file with column names into a - * specification with corresponding column Ids. This file is sent to all the - * relevant MR jobs. - * - * @param fs file system - * @param inputPath input file path - * @param smallestFile file name - * @param colNames column names - * @param prop csv file format properties - * @param specFileWithNames ? - * @return specification as a JSONObject - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws IOException if IOException occurs - * @throws JSONException if JSONException occurs - */ - private static String processSpecFile(FileSystem fs, String inputPath, String smallestFile, HashMap<String,Integer> colNames, CSVFileFormatProperties prop, String specWithNames) throws IllegalArgumentException, IOException, JSONException { - JSONObject inputSpec = new JSONObject(specWithNames); - - final String NAME = "name"; - final String ID = "id"; - final String METHOD = "method"; - final String VALUE = "value"; - final String MV_METHOD_MEAN = "global_mean"; - final String MV_METHOD_MODE = "global_mode"; - final String MV_METHOD_CONSTANT = "constant"; - final String BIN_METHOD_WIDTH = "equi-width"; - final String BIN_METHOD_HEIGHT = "equi-height"; - final String SCALE_METHOD_Z = "z-score"; - final String SCALE_METHOD_M = "mean-subtraction"; - final String JSON_BYPOS = "ids"; - - String stmp = null; - JSONObject entry = null; - byte btmp = 0; - - final int[] mvList; - int[] rcdList, dcdList, omitList; - final int[] binList; - final int[] scaleList; - byte[] mvMethods = null, binMethods=null, scaleMethods=null; - Object[] numBins = null; - Object[] mvConstants = null; - - boolean byPositions = (inputSpec.containsKey(JSON_BYPOS) && ((Boolean)inputSpec.get(JSON_BYPOS)).booleanValue() == true); - - // -------------------------------------------------------------------------- - // Omit - if( inputSpec.containsKey(TfUtils.TXMETHOD_OMIT) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_OMIT); - omitList = new int[arrtmp.size()]; - for(int i=0; i<arrtmp.size(); i++) { - if(byPositions) - omitList[i] = UtilFunctions.toInt( arrtmp.get(i) ); - else { - stmp = UtilFunctions.unquote( (String)arrtmp.get(i) ); - omitList[i] = colNames.get(stmp); - } - } - Arrays.sort(omitList); - } - else - omitList = null; - // -------------------------------------------------------------------------- - // Missing value imputation - if( inputSpec.containsKey(TfUtils.TXMETHOD_IMPUTE) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_IMPUTE); - - mvList = new int[arrtmp.size()]; - mvMethods = new byte[arrtmp.size()]; - mvConstants = new Object[arrtmp.size()]; - - for(int i=0; i<arrtmp.size(); i++) { - entry = (JSONObject)arrtmp.get(i); - if (byPositions) { - mvList[i] = UtilFunctions.toInt(entry.get(ID)); - } - else { - stmp = UtilFunctions.unquote((String) entry.get(NAME)); - mvList[i] = colNames.get(stmp); - } - - stmp = UtilFunctions.unquote((String) entry.get(METHOD)); - if(stmp.equals(MV_METHOD_MEAN)) - btmp = (byte)1; - else if ( stmp.equals(MV_METHOD_MODE)) - btmp = (byte)2; - else if ( stmp.equals(MV_METHOD_CONSTANT)) - btmp = (byte)3; - else - throw new IOException("Unknown missing value imputation method (" + stmp + ") in transformation specification: " + specWithNames); - mvMethods[i] = btmp; - - //txMethods.add( btmp ); - - mvConstants[i] = null; - if ( entry.containsKey(VALUE) ) - mvConstants[i] = entry.get(VALUE); - } - - Integer[] idx = new Integer[mvList.length]; - for(int i=0; i < mvList.length; i++) - idx[i] = i; - Arrays.sort(idx, new Comparator<Integer>() { - @Override - public int compare(Integer o1, Integer o2) { - return (mvList[o1]-mvList[o2]); - } - }); - - // rearrange mvList, mvMethods, and mvConstants according to permutation idx - inplacePermute(mvList, mvMethods, mvConstants, idx); - } - else - mvList = null; - // -------------------------------------------------------------------------- - // Recoding - if( inputSpec.containsKey(TfUtils.TXMETHOD_RECODE) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_RECODE); - rcdList = new int[arrtmp.size()]; - for(int i=0; i<arrtmp.size(); i++) { - if (byPositions) - rcdList[i] = UtilFunctions.toInt(arrtmp.get(i)); - else { - stmp = UtilFunctions.unquote( (String)arrtmp.get(i) ); - rcdList[i] = colNames.get(stmp); - } - } - Arrays.sort(rcdList); - } - else - rcdList = null; - // -------------------------------------------------------------------------- - // Binning - if( inputSpec.containsKey(TfUtils.TXMETHOD_BIN) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_BIN); - - binList = new int[arrtmp.size()]; - binMethods = new byte[arrtmp.size()]; - numBins = new Object[arrtmp.size()]; - - for(int i=0; i<arrtmp.size(); i++) { - entry = (JSONObject)arrtmp.get(i); - - if (byPositions) { - binList[i] = UtilFunctions.toInt(entry.get(ID)); - } - else { - stmp = UtilFunctions.unquote((String) entry.get(NAME)); - binList[i] = colNames.get(stmp); - } - stmp = UtilFunctions.unquote((String) entry.get(METHOD)); - if(stmp.equals(BIN_METHOD_WIDTH)) - btmp = (byte)1; - else if ( stmp.equals(BIN_METHOD_HEIGHT)) - throw new IOException("Equi-height binning method is not yet supported, in transformation specification: " + specWithNames); - else - throw new IOException("Unknown missing value imputation method (" + stmp + ") in transformation specification: " + specWithNames); - binMethods[i] = btmp; - - numBins[i] = entry.get(TfUtils.JSON_NBINS); - if ( ((Integer) numBins[i]).intValue() <= 1 ) - throw new IllegalArgumentException("Invalid transformation on column \"" + (String) entry.get(NAME) + "\". Number of bins must be greater than 1."); - } - - Integer[] idx = new Integer[binList.length]; - for(int i=0; i < binList.length; i++) - idx[i] = i; - Arrays.sort(idx, new Comparator<Integer>() { - @Override - public int compare(Integer o1, Integer o2) { - return (binList[o1]-binList[o2]); - } - }); - - // rearrange binList and binMethods according to permutation idx - inplacePermute(binList, binMethods, numBins, idx); - } - else - binList = null; - // -------------------------------------------------------------------------- - // Dummycoding - if( inputSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_DUMMYCODE); - dcdList = new int[arrtmp.size()]; - for(int i=0; i<arrtmp.size(); i++) { - if (byPositions) - dcdList[i] = UtilFunctions.toInt(arrtmp.get(i)); - else { - stmp = UtilFunctions.unquote( (String)arrtmp.get(i) ); - dcdList[i] = colNames.get(stmp); - } - } - Arrays.sort(dcdList); - } - else - dcdList = null; - // -------------------------------------------------------------------------- - // Scaling - if(inputSpec.containsKey(TfUtils.TXMETHOD_SCALE) ) { - JSONArray arrtmp = (JSONArray) inputSpec.get(TfUtils.TXMETHOD_SCALE); - - scaleList = new int[arrtmp.size()]; - scaleMethods = new byte[arrtmp.size()]; - - for(int i=0; i<arrtmp.size(); i++) { - entry = (JSONObject)arrtmp.get(i); - - if (byPositions) { - scaleList[i] = UtilFunctions.toInt(entry.get(ID)); - } - else { - stmp = UtilFunctions.unquote((String) entry.get(NAME)); - scaleList[i] = colNames.get(stmp); - } - stmp = UtilFunctions.unquote((String) entry.get(METHOD)); - if(stmp.equals(SCALE_METHOD_M)) - btmp = (byte)1; - else if ( stmp.equals(SCALE_METHOD_Z)) - btmp = (byte)2; - else - throw new IOException("Unknown missing value imputation method (" + stmp + ") in transformation specification: " + specWithNames); - scaleMethods[i] = btmp; - } - - Integer[] idx = new Integer[scaleList.length]; - for(int i=0; i < scaleList.length; i++) - idx[i] = i; - Arrays.sort(idx, new Comparator<Integer>() { - @Override - public int compare(Integer o1, Integer o2) { - return (scaleList[o1]-scaleList[o2]); - } - }); - - // rearrange scaleList and scaleMethods according to permutation idx - inplacePermute(scaleList, scaleMethods, null, idx); - } - else - scaleList = null; - // -------------------------------------------------------------------------- - - // check for column IDs that are imputed with mode, but not recoded - // These columns have be handled separately, because the computation of mode - // requires the computation of distinct values (i.e., recode maps) - ArrayList<Integer> tmpList = new ArrayList<Integer>(); - if(mvList != null) - for(int i=0; i < mvList.length; i++) { - int colID = mvList[i]; - if(mvMethods[i] == 2 && (rcdList == null || Arrays.binarySearch(rcdList, colID) < 0) ) - tmpList.add(colID); - } - - int[] mvrcdList = null; - if ( tmpList.size() > 0 ) { - mvrcdList = new int[tmpList.size()]; - for(int i=0; i < tmpList.size(); i++) - mvrcdList[i] = tmpList.get(i); - } - // Perform Validity Checks - - /* - OMIT MVI RCD BIN DCD SCL - OMIT - x * * * * - MVI x - * * * * - RCD * * - x * x - BIN * * x - * x - DCD * * * * - x - SCL * * x x x - - */ - - if(mvList != null) - for(int i=0; i < mvList.length; i++) - { - int colID = mvList[i]; - - if ( omitList != null && Arrays.binarySearch(omitList, colID) >= 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A column can not be both omitted and imputed."); - - if(mvMethods[i] == 1) - { - if ( rcdList != null && Arrays.binarySearch(rcdList, colID) >= 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A numeric column can not be recoded."); - - if ( dcdList != null && Arrays.binarySearch(dcdList, colID) >= 0 ) - // throw an error only if the column is not binned - if ( binList == null || Arrays.binarySearch(binList, colID) < 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A numeric column can not be dummycoded."); - } - } - - if(scaleList != null) - for(int i=0; i < scaleList.length; i++) - { - int colID = scaleList[i]; - if ( rcdList != null && Arrays.binarySearch(rcdList, colID) >= 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A column can not be recoded and scaled."); - if ( binList != null && Arrays.binarySearch(binList, colID) >= 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A column can not be binned and scaled."); - if ( dcdList != null && Arrays.binarySearch(dcdList, colID) >= 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A column can not be dummycoded and scaled."); - } - - if(rcdList != null) - for(int i=0; i < rcdList.length; i++) - { - int colID = rcdList[i]; - if ( binList != null && Arrays.binarySearch(binList, colID) >= 0 ) - throw new IllegalArgumentException("Invalid transformations on column ID " + colID + ". A column can not be recoded and binned."); - } - - // Check if dummycoded columns are either recoded or binned. - // If not, add them to recode list. - ArrayList<Integer> addToRcd = new ArrayList<Integer>(); - if(dcdList != null) - for(int i=0; i < dcdList.length; i++) - { - int colID = dcdList[i]; - boolean isRecoded = (rcdList != null && Arrays.binarySearch(rcdList, colID) >= 0); - boolean isBinned = (binList != null && Arrays.binarySearch(binList, colID) >= 0); - // If colID is neither recoded nor binned, then, add it to rcdList. - if ( !isRecoded && !isBinned ) - addToRcd.add(colID); - } - if ( addToRcd.size() > 0 ) - { - int[] newRcdList = null; - if ( rcdList != null) - newRcdList = Arrays.copyOf(rcdList, rcdList.length + addToRcd.size()); - else - newRcdList = new int[addToRcd.size()]; - - int i = (rcdList != null ? rcdList.length : 0); - for(int idx=0; i < newRcdList.length; i++, idx++) - newRcdList[i] = addToRcd.get(idx); - Arrays.sort(newRcdList); - rcdList = newRcdList; - } - // ----------------------------------------------------------------------------- - - // Prepare output spec - JSONObject outputSpec = new JSONObject(); - - if (omitList != null) - { - JSONObject rcdSpec = new JSONObject(); - rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(omitList)); - outputSpec.put(TfUtils.TXMETHOD_OMIT, rcdSpec); - } - - if (mvList != null) - { - JSONObject mvSpec = new JSONObject(); - mvSpec.put(TfUtils.JSON_ATTRS, toJSONArray(mvList)); - mvSpec.put(TfUtils.JSON_MTHD, toJSONArray(mvMethods)); - mvSpec.put(TfUtils.JSON_CONSTS, toJSONArray(mvConstants)); - outputSpec.put(TfUtils.TXMETHOD_IMPUTE, mvSpec); - } - - if (rcdList != null) - { - JSONObject rcdSpec = new JSONObject(); - rcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(rcdList)); - outputSpec.put(TfUtils.TXMETHOD_RECODE, rcdSpec); - } - - if (binList != null) - { - JSONObject binSpec = new JSONObject(); - binSpec.put(TfUtils.JSON_ATTRS, toJSONArray(binList)); - binSpec.put(TfUtils.JSON_MTHD, toJSONArray(binMethods)); - binSpec.put(TfUtils.JSON_NBINS, toJSONArray(numBins)); - outputSpec.put(TfUtils.TXMETHOD_BIN, binSpec); - } - - if (dcdList != null) - { - JSONObject dcdSpec = new JSONObject(); - dcdSpec.put(TfUtils.JSON_ATTRS, toJSONArray(dcdList)); - outputSpec.put(TfUtils.TXMETHOD_DUMMYCODE, dcdSpec); - } - - if (scaleList != null) - { - JSONObject scaleSpec = new JSONObject(); - scaleSpec.put(TfUtils.JSON_ATTRS, toJSONArray(scaleList)); - scaleSpec.put(TfUtils.JSON_MTHD, toJSONArray(scaleMethods)); - outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec); - } - - if (mvrcdList != null) - { - JSONObject mvrcd = new JSONObject(); - mvrcd.put(TfUtils.JSON_ATTRS, toJSONArray(mvrcdList)); - outputSpec.put(TfUtils.TXMETHOD_MVRCD, mvrcd); - } - - // return output spec with IDs - return outputSpec.toString(); - } - - private static JSONArray toJSONArray(int[] list) - { - JSONArray ret = new JSONArray(list.length); - for(int i=0; i < list.length; i++) - ret.add(list[i]); - return ret; - } - - private static JSONArray toJSONArray(byte[] list) - { - JSONArray ret = new JSONArray(list.length); - for(int i=0; i < list.length; i++) - ret.add(list[i]); - return ret; - } - - private static JSONArray toJSONArray(Object[] list) - throws JSONException - { - return new JSONArray(list); - } - - /** - * Helper function to move transformation metadata files from a temporary - * location to permanent location. These files (e.g., header before and - * after transformation) are generated by a single mapper, while applying - * data transformations. Note that, these files must be ultimately be placed - * under the existing metadata directory (txMtdPath), which is - * simultaneously read by other mappers. If they are not created at a - * temporary location, then MR tasks fail due to changing timestamps on - * txMtdPath. - * - * @param fs file system - * @param tmpPath temporary location (directory) of transformation metadata files - * @param txMtdPath directory where to place transformation metadata files - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws IOException if IOException occurs - */ - private static void moveFilesFromTmp(FileSystem fs, String tmpPath, String txMtdPath) throws IllegalArgumentException, IOException - { - // move files from temporary location to txMtdPath - MapReduceTool.renameFileOnHDFS(tmpPath + "/" + TfUtils.TXMTD_COLNAMES, txMtdPath + "/" + TfUtils.TXMTD_COLNAMES); - MapReduceTool.renameFileOnHDFS(tmpPath + "/" + TfUtils.TXMTD_DC_COLNAMES, txMtdPath + "/" + TfUtils.TXMTD_DC_COLNAMES); - MapReduceTool.renameFileOnHDFS(tmpPath + "/" + TfUtils.TXMTD_COLTYPES, txMtdPath + "/" + TfUtils.TXMTD_COLTYPES); - - if ( fs.exists(new Path(tmpPath +"/Dummycode/" + TfUtils.DCD_FILE_NAME)) ) - { - if ( !fs.exists( new Path(txMtdPath + "/Dummycode/") )) - fs.mkdirs(new Path(txMtdPath + "/Dummycode/")); - MapReduceTool.renameFileOnHDFS( tmpPath + "/Dummycode/" + TfUtils.DCD_FILE_NAME, txMtdPath + "/Dummycode/" + TfUtils.DCD_FILE_NAME); - } - } - - /** - * Helper function to determine the number of columns after applying - * transformations. Note that dummycoding changes the number of columns. - * - * @param fs file system - * @param header header line - * @param delim delimiter - * @param tfMtdPath transform metadata path - * @return number of columns after applying transformations - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws IOException if IOException occurs - * @throws DMLRuntimeException if DMLRuntimeException occurs - * @throws JSONException if JSONException occurs - */ - private static int getNumColumnsTf(FileSystem fs, String header, String delim, String tfMtdPath) throws IllegalArgumentException, IOException, DMLRuntimeException, JSONException { - String[] columnNames = Pattern.compile(Pattern.quote(delim)).split(header, -1); - int ret = columnNames.length; - - JSONObject spec = null; - try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(tfMtdPath + "/spec.json"))))) { - spec = JSONHelper.parse(br); - } - - // fetch relevant attribute lists - if ( !spec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) - return ret; - - JSONArray dcdList = (JSONArray) ((JSONObject)spec.get(TfUtils.TXMETHOD_DUMMYCODE)).get(TfUtils.JSON_ATTRS); - - // look for numBins among binned columns - for(Object o : dcdList) - { - int id = UtilFunctions.toInt(o); - - Path binpath = new Path( tfMtdPath + "/Bin/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_BIN_FILE_SUFFIX); - Path rcdpath = new Path( tfMtdPath + "/Recode/" + UtilFunctions.unquote(columnNames[id-1]) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); - - if ( TfUtils.checkValidInputFile(fs, binpath, false ) ) - { - int nbins = -1; - try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(binpath)))) { - nbins = UtilFunctions.parseToInt(br.readLine().split(TfUtils.TXMTD_SEP)[4]); - } - ret += (nbins-1); - } - else if ( TfUtils.checkValidInputFile(fs, rcdpath, false ) ) - { - int ndistinct = -1; - try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(rcdpath))) ) { - ndistinct = UtilFunctions.parseToInt(br.readLine()); - } - ret += (ndistinct-1); - } - else - throw new DMLRuntimeException("Relevant transformation metadata for column (id=" + id + ", name=" + columnNames[id-1] + ") is not found."); - } - - return ret; - } - - /** - * Main method to create and/or apply transformation metdata using MapReduce. - * - * @param jobinst MR job instruction - * @param inputs array of input matrices - * @param shuffleInst shuffle instructions - * @param otherInst other instructions - * @param resultIndices byte array of result indices - * @param outputs array of output matrices - * @param numReducers number of reducers - * @param replication ? - * @return MR job result - * @throws Exception if IOException occurs - */ - public static JobReturn mrDataTransform(MRJobInstruction jobinst, MatrixObject[] inputs, String shuffleInst, String otherInst, byte[] resultIndices, MatrixObject[] outputs, int numReducers, int replication) throws Exception { - - String[] insts = shuffleInst.split(Instruction.INSTRUCTION_DELIM); - - // Parse transform instruction (the first instruction) to obtain relevant fields - TransformOperands oprnds = new TransformOperands(insts[0], inputs[0]); - - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - - // find the first file in alphabetical ordering of part files in directory inputPath - String smallestFile = CSVReblockMR.findSmallestFile(job, oprnds.inputPath); - - // find column names - FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile); - String headerLine = readHeaderLine(fs, oprnds.inputCSVProperties, smallestFile); - HashMap<String, Integer> colNamesToIds = processColumnNames(fs, oprnds.inputCSVProperties, headerLine, smallestFile); - String outHeader = getOutputHeader(fs, headerLine, oprnds); - int numColumns = colNamesToIds.size(); - - int numColumnsTf = 0; - long numRowsTf = 0; - - ArrayList<Integer> csvoutputs= new ArrayList<Integer>(); - ArrayList<Integer> bboutputs = new ArrayList<Integer>(); - - // divide output objects based on output format (CSV or BinaryBlock) - for(int i=0; i < outputs.length; i++) - { - if(outputs[i].getFileFormatProperties() != null - && outputs[i].getFileFormatProperties().getFileFormat() == FileFormatProperties.FileFormat.CSV) - csvoutputs.add(i); - else - bboutputs.add(i); - } - boolean isCSV = (csvoutputs.size() > 0); - boolean isBB = (bboutputs.size() > 0); - String tmpPath = MRJobConfiguration.constructTempOutputFilename(); - - checkIfOutputOverlapsWithTxMtd(outputs, oprnds, isCSV, isBB, csvoutputs, bboutputs, fs); - - JobReturn retCSV = null, retBB = null; - - if (!oprnds.isApply) { - // build specification file with column IDs insteadof column names - String specWithIDs = processSpecFile(fs, oprnds.inputPath, - smallestFile, colNamesToIds, oprnds.inputCSVProperties, oprnds.spec); - colNamesToIds = null; // enable GC on colNamesToIds - - // Build transformation metadata, including recode maps, bin definitions, etc. - // Also, generate part offsets file (counters file), which is to be used in csv-reblock - - String partOffsetsFile = MRJobConfiguration.constructTempOutputFilename(); - numRowsTf = GenTfMtdMR.runJob(oprnds.inputPath, oprnds.txMtdPath, specWithIDs, smallestFile, - partOffsetsFile, oprnds.inputCSVProperties, numColumns, replication, outHeader); - - if ( numRowsTf == 0 ) - throw new DMLRuntimeException(ERROR_MSG_ZERO_ROWS); - - // store the specFileWithIDs as transformation metadata - MapReduceTool.writeStringToHDFS(specWithIDs, oprnds.txMtdPath + "/" + "spec.json"); - - numColumnsTf = getNumColumnsTf(fs, outHeader, oprnds.inputCSVProperties.getDelim(), oprnds.txMtdPath); - - // Apply transformation metadata, and perform actual transformation - if(isCSV) - retCSV = ApplyTfCSVMR.runJob(oprnds.inputPath, specWithIDs, oprnds.txMtdPath, tmpPath, - outputs[csvoutputs.get(0)].getFileName(), partOffsetsFile, - oprnds.inputCSVProperties, numColumns, replication, outHeader); - - if(isBB) - { - DMLConfig conf = ConfigurationManager.getDMLConfig(); - int blockSize = conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE); - CSVReblockInstruction rblk = prepDummyReblockInstruction(oprnds.inputCSVProperties, blockSize); - - AssignRowIDMRReturn ret1 = CSVReblockMR.runAssignRowIDMRJob(new String[]{oprnds.inputPath}, - new InputInfo[]{InputInfo.CSVInputInfo}, new int[]{blockSize}, new int[]{blockSize}, - rblk.toString(), replication, new String[]{smallestFile}, true, - oprnds.inputCSVProperties.getNAStrings(), specWithIDs); - if ( ret1.rlens[0] == 0 ) - throw new DMLRuntimeException(ERROR_MSG_ZERO_ROWS); - - retBB = ApplyTfBBMR.runJob(oprnds.inputPath, insts[1], otherInst, - specWithIDs, oprnds.txMtdPath, tmpPath, outputs[bboutputs.get(0)].getFileName(), - ret1.counterFile.toString(), oprnds.inputCSVProperties, numRowsTf, numColumns, - numColumnsTf, replication, outHeader); - } - - MapReduceTool.deleteFileIfExistOnHDFS(new Path(partOffsetsFile), job); - - } - else { - colNamesToIds = null; // enable GC on colNamesToIds - - // copy given transform metadata (applyTxPath) to specified location (txMtdPath) - MapReduceTool.deleteFileIfExistOnHDFS(new Path(oprnds.txMtdPath), job); - MapReduceTool.copyFileOnHDFS(oprnds.applyTxPath, oprnds.txMtdPath); - - // path to specification file - String specWithIDs = (oprnds.spec != null) ? oprnds.spec : - MapReduceTool.readStringFromHDFSFile(oprnds.txMtdPath + "/" + "spec.json"); - numColumnsTf = getNumColumnsTf(fs, outHeader, - oprnds.inputCSVProperties.getDelim(), - oprnds.txMtdPath); - - if (isCSV) - { - DMLConfig conf = ConfigurationManager.getDMLConfig(); - int blockSize = conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE); - CSVReblockInstruction rblk = prepDummyReblockInstruction(oprnds.inputCSVProperties, blockSize); - - AssignRowIDMRReturn ret1 = CSVReblockMR.runAssignRowIDMRJob(new String[]{oprnds.inputPath}, - new InputInfo[]{InputInfo.CSVInputInfo}, new int[]{blockSize}, new int[]{blockSize}, - rblk.toString(), replication, new String[]{smallestFile}, true, - oprnds.inputCSVProperties.getNAStrings(), specWithIDs); - numRowsTf = ret1.rlens[0]; - - if ( ret1.rlens[0] == 0 ) - throw new DMLRuntimeException(ERROR_MSG_ZERO_ROWS); - - // Apply transformation metadata, and perform actual transformation - retCSV = ApplyTfCSVMR.runJob(oprnds.inputPath, specWithIDs, oprnds.applyTxPath, tmpPath, - outputs[csvoutputs.get(0)].getFileName(), ret1.counterFile.toString(), - oprnds.inputCSVProperties, numColumns, replication, outHeader); - } - - if(isBB) - { - // compute part offsets file - CSVReblockInstruction rblk = (CSVReblockInstruction) InstructionParser.parseSingleInstruction(insts[1]); - CSVReblockInstruction newrblk = (CSVReblockInstruction) rblk.clone((byte)0); - AssignRowIDMRReturn ret1 = CSVReblockMR.runAssignRowIDMRJob(new String[]{oprnds.inputPath}, - new InputInfo[]{InputInfo.CSVInputInfo}, new int[]{newrblk.brlen}, new int[]{newrblk.bclen}, - newrblk.toString(), replication, new String[]{smallestFile}, true, - oprnds.inputCSVProperties.getNAStrings(), specWithIDs); - numRowsTf = ret1.rlens[0]; - - if ( ret1.rlens[0] == 0 ) - throw new DMLRuntimeException(ERROR_MSG_ZERO_ROWS); - - // apply transformation metadata, as well as reblock the resulting data - retBB = ApplyTfBBMR.runJob(oprnds.inputPath, insts[1], otherInst, specWithIDs, - oprnds.txMtdPath, tmpPath, outputs[bboutputs.get(0)].getFileName(), - ret1.counterFile.toString(), oprnds.inputCSVProperties, ret1.rlens[0], - ret1.clens[0], numColumnsTf, replication, outHeader); - } - } - - // copy auxiliary data (old and new header lines) from temporary location to txMtdPath - moveFilesFromTmp(fs, tmpPath, oprnds.txMtdPath); - - // generate matrix metadata file for outputs - if ( retCSV != null ) - { - retCSV.getMatrixCharacteristics(0).setDimension(numRowsTf, numColumnsTf); - - CSVFileFormatProperties prop = new CSVFileFormatProperties( - false, - oprnds.inputCSVProperties.getDelim(), // use the same header as the input - false, Double.NaN, null); - - MapReduceTool.writeMetaDataFile (outputs[csvoutputs.get(0)].getFileName()+".mtd", - ValueType.DOUBLE, retCSV.getMatrixCharacteristics(0), - OutputInfo.CSVOutputInfo, prop); - return retCSV; - } - - if ( retBB != null ) - { - retBB.getMatrixCharacteristics(0).setDimension(numRowsTf, numColumnsTf); - - MapReduceTool.writeMetaDataFile (outputs[bboutputs.get(0)].getFileName()+".mtd", - ValueType.DOUBLE, retBB.getMatrixCharacteristics(0), OutputInfo.BinaryBlockOutputInfo); - return retBB; - } - - return null; - - } - - private static CSVReblockInstruction prepDummyReblockInstruction(CSVFileFormatProperties prop, int blockSize) { - StringBuilder sb = new StringBuilder(); - sb.append( ExecType.MR ); - - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( CSVReBlock.OPCODE ); - - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( "0" ); - sb.append(Lop.DATATYPE_PREFIX); - sb.append(DataType.MATRIX); - sb.append(Lop.VALUETYPE_PREFIX); - sb.append(ValueType.DOUBLE); - - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( "1" ); - sb.append(Lop.DATATYPE_PREFIX); - sb.append(DataType.MATRIX); - sb.append(Lop.VALUETYPE_PREFIX); - sb.append(ValueType.DOUBLE); - - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( blockSize ); - - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( blockSize ); - - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( prop.hasHeader() ); - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( prop.getDelim() ); - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( prop.isFill() ); - sb.append( Lop.OPERAND_DELIMITOR ); - sb.append( prop.getFillValue() ); - - return (CSVReblockInstruction) CSVReblockInstruction.parseInstruction(sb.toString()); - } - - private static String getOutputHeader(FileSystem fs, String headerLine, TransformOperands oprnds) throws IOException - { - String ret = null; - - if(oprnds.isApply) - { - try( BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(new Path(oprnds.applyTxPath + "/" + TfUtils.TXMTD_COLNAMES)) )) ) { - ret = br.readLine(); - } - } - else { - if ( oprnds.outNamesFile == null ) - ret = headerLine; - else { - try( BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(new Path(oprnds.outNamesFile)) )) ) { - ret = br.readLine(); - } - } - } - - return ret; - } - - /** - * Main method to create and/or apply transformation metdata in-memory, on a - * single node. - * - * @param inst parameterized built-in CP instruction - * @param inputs array of input data - * @param outputs array of output matrices - * @return MR job result - * @throws IOException if IOException occurs - * @throws DMLRuntimeException if DMLRuntimeException occurs - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws JSONException if JSONException occurs - */ - public static JobReturn cpDataTransform(ParameterizedBuiltinCPInstruction inst, CacheableData<?>[] inputs, MatrixObject[] outputs) throws IOException, DMLRuntimeException, IllegalArgumentException, JSONException { - TransformOperands oprnds = new TransformOperands(inst.getParameterMap(), inputs[0]); - return cpDataTransform(oprnds, inputs, outputs); - } - - public static JobReturn cpDataTransform(String inst, CacheableData<?>[] inputs, MatrixObject[] outputs) - throws IOException, DMLRuntimeException, IllegalArgumentException, JSONException - { - String[] insts = inst.split(Instruction.INSTRUCTION_DELIM); - // Parse transform instruction (the first instruction) to obtain relevant fields - TransformOperands oprnds = new TransformOperands(insts[0], inputs[0]); - - return cpDataTransform(oprnds, inputs, outputs); - } - - public static JobReturn cpDataTransform(TransformOperands oprnds, CacheableData<?>[] inputs, MatrixObject[] outputs) - throws IOException, DMLRuntimeException, IllegalArgumentException, JSONException - { - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - - // find the first file in alphabetical ordering of partfiles in directory inputPath - String smallestFile = CSVReblockMR.findSmallestFile(job, oprnds.inputPath); - FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile); - - // find column names - String headerLine = readHeaderLine(fs, oprnds.inputCSVProperties, smallestFile); - HashMap<String, Integer> colNamesToIds = processColumnNames(fs, oprnds.inputCSVProperties, headerLine, smallestFile); - String outHeader = getOutputHeader(fs, headerLine, oprnds); - - ArrayList<Integer> csvoutputs= new ArrayList<Integer>(); - ArrayList<Integer> bboutputs = new ArrayList<Integer>(); - - // divide output objects based on output format (CSV or BinaryBlock) - for(int i=0; i < outputs.length; i++) - { - if(outputs[i].getFileFormatProperties() != null && outputs[i].getFileFormatProperties().getFileFormat() == FileFormatProperties.FileFormat.CSV) - csvoutputs.add(i); - else - bboutputs.add(i); - } - boolean isCSV = (csvoutputs.size() > 0); - boolean isBB = (bboutputs.size() > 0); - - checkIfOutputOverlapsWithTxMtd(outputs, oprnds, isCSV, isBB, csvoutputs, bboutputs, fs); - - JobReturn ret = null; - - if (!oprnds.isApply) { - // build specification file with column IDs insteadof column names - String specWithIDs = processSpecFile(fs, oprnds.inputPath, smallestFile, colNamesToIds, oprnds.inputCSVProperties, - oprnds.spec); - MapReduceTool.writeStringToHDFS(specWithIDs, oprnds.txMtdPath + "/" + "spec.json"); - - ret = performTransform(job, fs, oprnds.inputPath, colNamesToIds.size(), oprnds.inputCSVProperties, specWithIDs, - oprnds.txMtdPath, oprnds.isApply, outputs[0], outHeader, isBB, isCSV ); - } - else { - // copy given transform metadata (applyTxPath) to specified location (txMtdPath) - MapReduceTool.deleteFileIfExistOnHDFS(new Path(oprnds.txMtdPath), job); - MapReduceTool.copyFileOnHDFS(oprnds.applyTxPath, oprnds.txMtdPath); - - // path to specification file (optionally specified) - String specWithIDs = (oprnds.spec != null) ? - oprnds.spec : MapReduceTool.readStringFromHDFSFile(oprnds.txMtdPath + "/" + "spec.json"); - - ret = performTransform(job, fs, oprnds.inputPath, colNamesToIds.size(), oprnds.inputCSVProperties, specWithIDs, - oprnds.txMtdPath, oprnds.isApply, outputs[0], outHeader, isBB, isCSV ); - } - - return ret; - } - - /** - * Helper function to fetch and sort the list of part files under the given - * input directory. - * - * @param input input directory - * @param fs file system - * @return list of paths to input file parts - * @throws FileNotFoundException if FileNotFoundException occurs - * @throws IOException if IOException occurs - */ - @SuppressWarnings("unchecked") - private static ArrayList<Path> collectInputFiles(String input, FileSystem fs) throws FileNotFoundException, IOException - { - Path path = new Path(input); - ArrayList<Path> files=new ArrayList<Path>(); - if(fs.isDirectory(path)) - { - for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter)) - files.add(stat.getPath()); - Collections.sort(files); - } - else - files.add(path); - - return files; - } - - private static int[] countNumRows(ArrayList<Path> files, CSVFileFormatProperties prop, FileSystem fs, TfUtils agents) throws IOException - { - int[] rows = new int[2]; - int numRows=0, numRowsTf=0; - - OmitAgent oa = agents.getOmitAgent(); - - if(!oa.isApplicable()) - { - for(int fileNo=0; fileNo<files.size(); fileNo++) - { - try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))))) { - if(fileNo==0 && prop.hasHeader() ) - br.readLine(); //ignore header - - while ( br.readLine() != null) - numRows++; - } - } - numRowsTf = numRows; - } - else - { - String line = null; - String[] words; - - Pattern delim = Pattern.compile(Pattern.quote(prop.getDelim())); - - for(int fileNo=0; fileNo<files.size(); fileNo++) - { - try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))) ) { - if(fileNo==0 && prop.hasHeader() ) - br.readLine(); //ignore header - while ( (line=br.readLine()) != null) - { - numRows++; - - words = delim.split(line, -1); - if(!oa.omit(words, agents)) - numRowsTf++; - } - } - } - } - - rows[0] = numRows; - rows[1] = numRowsTf; - - return rows; - } - - /** - * Main method to create and/or apply transformation metdata in-memory, on a single node. - * - * @param job job configuration - * @param fs file system - * @param inputPath path to input files - * @param ncols number of columns - * @param prop csv file format properties - * @param specWithIDs JSON transform specification with IDs - * @param tfMtdPath transform metadata path - * @param isApply ? - * @param result output matrix - * @param headerLine header line - * @param isBB true if binary block - * @param isCSV true if CSV - * @return MR job result - * @throws IOException if IOException occurs - * @throws DMLRuntimeException if DMLRuntimeException occurs - * @throws IllegalArgumentException if IllegalArgumentException occurs - * @throws JSONException if JSONException occurs - */ - private static JobReturn performTransform(JobConf job, FileSystem fs, String inputPath, int ncols, CSVFileFormatProperties prop, String specWithIDs, String tfMtdPath, boolean isApply, MatrixObject result, String headerLine, boolean isBB, boolean isCSV ) throws IOException, DMLRuntimeException, IllegalArgumentException, JSONException { - - String[] na = TfUtils.parseNAStrings(prop.getNAStrings()); - - JSONObject spec = new JSONObject(specWithIDs); - TfUtils agents = new TfUtils(headerLine, prop.hasHeader(), prop.getDelim(), na, spec, ncols, tfMtdPath, null, null ); - - MVImputeAgent _mia = agents.getMVImputeAgent(); - RecodeAgent _ra = agents.getRecodeAgent(); - BinAgent _ba = agents.getBinAgent(); - DummycodeAgent _da = agents.getDummycodeAgent(); - - // List of files to read - ArrayList<Path> files = collectInputFiles(inputPath, fs); - - // --------------------------------- - // Construct transformation metadata - // --------------------------------- - - String line = null; - String[] words = null; - - int numColumnsTf=0; - - if (!isApply) { - for(int fileNo=0; fileNo<files.size(); fileNo++) - { - try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))) ) { - if(fileNo==0 && prop.hasHeader() ) - br.readLine(); //ignore header - - line = null; - while ( (line = br.readLine()) != null) { - agents.prepareTfMtd(line); - } - } - } - - if(agents.getValid() == 0) - throw new DMLRuntimeException(ERROR_MSG_ZERO_ROWS); - - _mia.outputTransformationMetadata(tfMtdPath, fs, agents); - _ba.outputTransformationMetadata(tfMtdPath, fs, agents); - _ra.outputTransformationMetadata(tfMtdPath, fs, agents); - - // prepare agents for the subsequent phase of applying transformation metadata - - // NO need to loadTxMtd for _ra, since the maps are already present in the memory - Path tmp = new Path(tfMtdPath); - _mia.loadTxMtd(job, fs, tmp, agents); - _ba.loadTxMtd(job, fs, tmp, agents); - - _da.setRecodeMapsCP( _ra.getCPRecodeMaps() ); - _da.setNumBins(_ba.getColList(), _ba.getNumBins()); - _da.loadTxMtd(job, fs, tmp, agents); - } - else { - // Count the number of rows - int rows[] = countNumRows(files, prop, fs, agents); - agents.setTotal(rows[0]); - agents.setValid(rows[1]); - - if(agents.getValid() == 0) - throw new DMLRuntimeException("Number of rows in the transformed output (potentially, after ommitting the ones with missing values) is zero. Cannot proceed."); - - // Load transformation metadata - // prepare agents for the subsequent phase of applying transformation metadata - Path tmp = new Path(tfMtdPath); - _mia.loadTxMtd(job, fs, tmp, agents); - _ra.loadTxMtd(job, fs, tmp, agents); - _ba.loadTxMtd(job, fs, tmp, agents); - - _da.setRecodeMaps( _ra.getRecodeMaps() ); - _da.setNumBins(_ba.getColList(), _ba.getNumBins()); - _da.loadTxMtd(job, fs, tmp, agents); - } - - // ----------------------------- - // Apply transformation metadata - // ----------------------------- - - numColumnsTf = getNumColumnsTf(fs, headerLine, prop.getDelim(), tfMtdPath); - - MapReduceTool.deleteFileIfExistOnHDFS(result.getFileName()); - BufferedWriter out=new BufferedWriter(new OutputStreamWriter(fs.create(new Path(result.getFileName()),true))); - StringBuilder sb = new StringBuilder(); - - try { - MatrixBlock mb = null; - if ( isBB ) - { - int estNNZ = (int)agents.getValid() * ncols; - mb = new MatrixBlock((int)agents.getValid(), numColumnsTf, estNNZ ); - - if ( mb.isInSparseFormat() ) - mb.allocateSparseRowsBlock(); - else - mb.allocateDenseBlock(); - } - - int rowID = 0; // rowid to be used in filling the matrix block - - for(int fileNo=0; fileNo<files.size(); fileNo++) - { - try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))) ) { - if ( fileNo == 0 ) - { - if ( prop.hasHeader() ) - br.readLine(); // ignore the header line from data file - - //TODO: fix hard-wired header propagation to meta data column names - - String dcdHeader = _da.constructDummycodedHeader(headerLine, agents.getDelim()); - numColumnsTf = _da.genDcdMapsAndColTypes(fs, tfMtdPath, ncols, agents); - generateHeaderFiles(fs, tfMtdPath, headerLine, dcdHeader); - } - - line = null; - while ( (line = br.readLine()) != null) { - words = agents.getWords(line); - - if(!agents.omit(words)) - { - words = agents.apply(words); - - if (isCSV) - { - out.write( agents.checkAndPrepOutputString(words, sb) ); - out.write("\n"); - } - - if( isBB ) - { - agents.check(words); - for(int c=0; c<words.length; c++) - { - if(words[c] == null || words[c].isEmpty()) - ; - else - mb.appendValue(rowID, c, UtilFunctions.parseToDouble(words[c])); - } - } - rowID++; - } - } - } - } - - if(mb != null) - { - mb.recomputeNonZeros(); - mb.examSparsity(); - - result.acquireModify(mb); - result.release(); - result.exportData(); - } - } - finally { - IOUtilFunctions.closeSilently(out); - } - - MatrixCharacteristics mc = new MatrixCharacteristics(agents.getValid(), numColumnsTf, (int) result.getNumRowsPerBlock(), (int) result.getNumColumnsPerBlock()); - JobReturn ret = new JobReturn(new MatrixCharacteristics[]{mc}, true); - - return ret; - } - - public static void generateHeaderFiles(FileSystem fs, String txMtdDir, String origHeader, String newHeader) throws IOException { - // write out given header line - try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter( - fs.create(new Path(txMtdDir+"/" + TfUtils.TXMTD_COLNAMES),true)))) { - br.write(origHeader+"\n"); - } - - // write out the new header line (after all transformations) - try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter( - fs.create(new Path(txMtdDir + "/" + TfUtils.TXMTD_DC_COLNAMES),true))) ){ - br.write(newHeader+"\n"); - } - } - - private static void checkIfOutputOverlapsWithTxMtd(MatrixObject[] outputs, TransformOperands oprnds, - boolean isCSV, boolean isBB, ArrayList<Integer> csvoutputs, ArrayList<Integer> bboutputs, FileSystem fs) throws DMLRuntimeException { - if(isCSV) { - checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, outputs[csvoutputs.get(0)].getFileName(), fs); - } - else if(isBB) { - checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, outputs[bboutputs.get(0)].getFileName(), fs); - } - } - - @SuppressWarnings("deprecation") - private static void checkIfOutputOverlapsWithTxMtd(String txMtdPath, String outputPath, FileSystem fs) - throws DMLRuntimeException - { - Path path1 = new Path(txMtdPath).makeQualified(fs); - Path path2 = new Path(outputPath).makeQualified(fs); - - String fullTxMtdPath = path1.toString(); - String fullOutputPath = path2.toString(); - - if(path1.getParent().toString().equals(path2.getParent().toString())) { - // Both txMtdPath and outputPath are in same folder, but outputPath can have suffix - if(fullTxMtdPath.equals(fullOutputPath)) { - throw new DMLRuntimeException("The transform path \'" + txMtdPath - + "\' cannot overlap with the output path \'" + outputPath + "\'"); - } - } - else if(fullTxMtdPath.startsWith(fullOutputPath) || fullOutputPath.startsWith(fullTxMtdPath)) { - throw new DMLRuntimeException("The transform path \'" + txMtdPath - + "\' cannot overlap with the output path \'" + outputPath + "\'"); - } - } - - public static void spDataTransform(ParameterizedBuiltinSPInstruction inst, FrameObject[] inputs, MatrixObject[] outputs, ExecutionContext ec) throws Exception { - - SparkExecutionContext sec = (SparkExecutionContext)ec; - - // Parse transform instruction (the first instruction) to obtain relevant fields - TransformOperands oprnds = new TransformOperands(inst.getParams(), inputs[0]); - - JobConf job = new JobConf(); - FileSystem fs = IOUtilFunctions.getFileSystem(inputs[0].getFileName()); - - checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, outputs[0].getFileName(), fs); - - // find the first file in alphabetical ordering of partfiles in directory inputPath - String smallestFile = CSVReblockMR.findSmallestFile(job, oprnds.inputPath); - - // find column names and construct output header - String headerLine = readHeaderLine(fs, oprnds.inputCSVProperties, smallestFile); - HashMap<String, Integer> colNamesToIds = processColumnNames(fs, oprnds.inputCSVProperties, headerLine, smallestFile); - int numColumns = colNamesToIds.size(); - String outHeader = getOutputHeader(fs, headerLine, oprnds); - - String tmpPath = MRJobConfiguration.constructTempOutputFilename(); - - // Construct RDD for input data - @SuppressWarnings("unchecked") - JavaPairRDD<LongWritable, Text> inputData = (JavaPairRDD<LongWritable, Text>) sec.getRDDHandleForFrameObject(inputs[0], InputInfo.CSVInputInfo); - JavaRDD<Tuple2<LongWritable,Text>> csvLines = JavaPairRDD.toRDD(inputData).toJavaRDD(); - - long numRowsTf=0, numColumnsTf=0; - JavaPairRDD<Long, String> tfPairRDD = null; - - if (!oprnds.isApply) { - // build specification file with column IDs insteadof column names - String specWithIDs = processSpecFile(fs, oprnds.inputPath, smallestFile, - colNamesToIds, oprnds.inputCSVProperties, oprnds.spec); - colNamesToIds = null; // enable GC on colNamesToIds - - // Build transformation metadata, including recode maps, bin definitions, etc. - // Also, generate part offsets file (counters file), which is to be used in csv-reblock (if needed) - String partOffsetsFile = MRJobConfiguration.constructTempOutputFilename(); - numRowsTf = GenTfMtdSPARK.runSparkJob(sec, csvLines, oprnds.txMtdPath, - specWithIDs,partOffsetsFile, - oprnds.inputCSVProperties, numColumns, - outHeader); - - // store the specFileWithIDs as transformation metadata - MapReduceTool.writeStringToHDFS(specWithIDs, oprnds.txMtdPath + "/" + "spec.json"); - - numColumnsTf = getNumColumnsTf(fs, outHeader, oprnds.inputCSVProperties.getDelim(), oprnds.txMtdPath); - - tfPairRDD = ApplyTfCSVSPARK.runSparkJob(sec, csvLines, oprnds.txMtdPath, - specWithIDs, tmpPath, oprnds.inputCSVProperties, numColumns, outHeader); - - - MapReduceTool.deleteFileIfExistOnHDFS(new Path(partOffsetsFile), job); - } - else { - colNamesToIds = null; // enable GC on colNamesToIds - - // copy given transform metadata (applyTxPath) to specified location (txMtdPath) - MapReduceTool.deleteFileIfExistOnHDFS(new Path(oprnds.txMtdPath), job); - MapReduceTool.copyFileOnHDFS(oprnds.applyTxPath, oprnds.txMtdPath); - - // path to specification file - String specWithIDs = (oprnds.spec != null) ? oprnds.spec : - MapReduceTool.readStringFromHDFSFile(oprnds.txMtdPath + "/" + "spec.json"); - numColumnsTf = getNumColumnsTf(fs, outHeader, - oprnds.inputCSVProperties.getDelim(), - oprnds.txMtdPath); - - // Apply transformation metadata, and perform actual transformation - tfPairRDD = ApplyTfCSVSPARK.runSparkJob(sec, csvLines, oprnds.txMtdPath, - specWithIDs, tmpPath, oprnds.inputCSVProperties, numColumns, outHeader); - - } - - // copy auxiliary data (old and new header lines) from temporary location to txMtdPath - moveFilesFromTmp(fs, tmpPath, oprnds.txMtdPath); - - // convert to csv output format (serialized longwritable/text) - JavaPairRDD<LongWritable, Text> outtfPairRDD = - RDDConverterUtils.stringToSerializableText(tfPairRDD); - - if ( outtfPairRDD != null ) - { - MatrixObject outMO = outputs[0]; - String outVar = outMO.getVarName(); - outMO.setRDDHandle(new RDDObject(outtfPairRDD, outVar)); - sec.addLineageRDD(outVar, inst.getParams().get("target")); - - //update output statistics (required for correctness) - MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(outVar); - mcOut.setDimension(numRowsTf, numColumnsTf); - mcOut.setNonZeros(-1); - } - } - - - /** - * Private class to hold the relevant input parameters to transform operation. - */ - private static class TransformOperands - { - private String inputPath=null; - private String txMtdPath=null; - private String applyTxPath=null; - private String spec=null; - private String outNamesFile=null; - private boolean isApply=false; - private CSVFileFormatProperties inputCSVProperties = null; - - private TransformOperands(String inst, CacheableData<?> input) { - inputPath = input.getFileName(); - inputCSVProperties = (CSVFileFormatProperties)input.getFileFormatProperties(); - String[] instParts = inst.split(Instruction.OPERAND_DELIM); - txMtdPath = instParts[3]; - applyTxPath = instParts[4].startsWith("applymtd=") ? instParts[4].substring(9) : null; - isApply = (applyTxPath != null); - int pos = (applyTxPath != null) ? 5 : 4; - if( pos<instParts.length ) - spec = instParts[pos].startsWith("spec=") ? instParts[pos++].substring(5) : null; - if( pos<instParts.length ) - outNamesFile = instParts[pos].startsWith("outnames=") ? instParts[pos].substring(9) : null; - } - - private TransformOperands(HashMap<String, String> params, CacheableData<?> input) { - inputPath = input.getFileName(); - txMtdPath = params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_MTD); - spec = params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_SPEC); - applyTxPath = params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD); - isApply = (applyTxPath != null); - outNamesFile = params.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_OUTNAMES); // can be null - inputCSVProperties = (CSVFileFormatProperties)input.getFileFormatProperties(); - } - } -} -
http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java b/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java deleted file mode 100644 index c35e160..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/DistinctValue.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; - -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.util.UtilFunctions; - -public class DistinctValue implements Writable, Serializable { - - private static final long serialVersionUID = -8236705946336974836L; - - private static final byte [] EMPTY_BYTES = new byte[0]; - - // word (distinct value) - private byte[] _bytes; - private int _length; - // count - private long _count; - - public DistinctValue() { - _bytes = EMPTY_BYTES; - _length = 0; - _count = -1; - } - - public DistinctValue(String w, long count) throws CharacterCodingException { - ByteBuffer bb = Text.encode(w, true); - _bytes = bb.array(); - _length = bb.limit(); - _count = count; - } - - public DistinctValue(OffsetCount oc) throws CharacterCodingException { - this(oc.filename + "," + oc.fileOffset, oc.count); - } - - public void reset() { - _bytes = EMPTY_BYTES; - _length = 0; - _count = -1; - } - - public String getWord() { - return new String( _bytes, 0, _length, Charset.forName("UTF-8") ); - } - - public long getCount() { - return _count; - } - - @Override - public void write(DataOutput out) throws IOException { - // write word - WritableUtils.writeVInt(out, _length); - out.write(_bytes, 0, _length); - // write count - out.writeLong(_count); - } - - @Override - public void readFields(DataInput in) throws IOException { - // read word - _length = WritableUtils.readVInt(in); - _bytes = new byte[_length]; - in.readFully(_bytes, 0, _length); - // read count - _count = in.readLong(); - } - - public OffsetCount getOffsetCount() { - String[] parts = getWord().split(","); - return new OffsetCount( parts[0], - UtilFunctions.parseToLong(parts[1]), - getCount() ); - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java deleted file mode 100644 index 676b31e..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.regex.Pattern; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.sysml.runtime.matrix.data.FrameBlock; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.transform.encode.Encoder; -import org.apache.sysml.runtime.transform.meta.TfMetaUtils; -import org.apache.sysml.runtime.util.UtilFunctions; -import org.apache.wink.json4j.JSONException; -import org.apache.wink.json4j.JSONObject; - -public class DummycodeAgent extends Encoder -{ - private static final long serialVersionUID = 5832130477659116489L; - - private HashMap<Integer, HashMap<String,String>> _finalMaps = null; - private HashMap<Integer, HashMap<String,Long>> _finalMapsCP = null; - private int[] _binList = null; - private int[] _numBins = null; - - private int[] _domainSizes = null; // length = #of dummycoded columns - private int[] _dcdColumnMap = null; // to help in translating between original and dummycoded column IDs - private long _dummycodedLength = 0; // #of columns after dummycoded - - public DummycodeAgent(JSONObject parsedSpec, String[] colnames, int clen) throws JSONException { - super(null, clen); - - if ( parsedSpec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) { - int[] collist = TfMetaUtils.parseJsonIDList(parsedSpec, colnames, TfUtils.TXMETHOD_DUMMYCODE); - initColList(collist); - } - } - - @Override - public int getNumCols() { - return (int)_dummycodedLength; - } - - /** - * Method to output transformation metadata from the mappers. - * This information is collected and merged by the reducers. - */ - @Override - public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { - // There is no metadata required for dummycode. - // Required information is output from RecodeAgent. - return; - } - - @Override - public void mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, - String outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException { - // Nothing to do here - } - - public void setRecodeMaps(HashMap<Integer, HashMap<String,String>> maps) { - _finalMaps = maps; - } - - public void setRecodeMapsCP(HashMap<Integer, HashMap<String,Long>> maps) { - _finalMapsCP = maps; - } - - public void setNumBins(int[] binList, int[] numbins) { - _binList = binList; - _numBins = numbins; - } - - /** - * Method to generate dummyCodedMaps.csv, with the range of column IDs for each variable in the original data. - * - * Each line in dummyCodedMaps.csv file is of the form: [ColID, 1/0, st, end] - * 1/0 indicates if ColID is dummycoded or not - * [st,end] is the range of dummycoded column numbers for the given ColID - * - * It also generates coltypes.csv, with the type (scale, nominal, etc.) of columns in the output. - * Recoded columns are of type nominal, binner columns are of type ordinal, dummycoded columns are of type - * dummycoded, and the remaining are of type scale. - * - * @param fs file system - * @param txMtdDir path to transform metadata directory - * @param numCols number of columns - * @param agents ? - * @return ? - * @throws IOException if IOException occurs - */ - public int genDcdMapsAndColTypes(FileSystem fs, String txMtdDir, int numCols, TfUtils agents) throws IOException { - - // initialize all column types in the transformed data to SCALE - TfUtils.ColumnTypes[] ctypes = new TfUtils.ColumnTypes[(int) _dummycodedLength]; - for(int i=0; i < _dummycodedLength; i++) - ctypes[i] = TfUtils.ColumnTypes.SCALE; - - _dcdColumnMap = new int[numCols]; - - int sum=1; - try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter( - fs.create(new Path(txMtdDir+"/Dummycode/" + TfUtils.DCD_FILE_NAME),true))) ) { - int idx = 0; - for(int colID=1; colID <= numCols; colID++) - { - if ( _colList != null && idx < _colList.length && _colList[idx] == colID ) - { - br.write(colID + TfUtils.TXMTD_SEP + "1" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + (sum+_domainSizes[idx]-1) + "\n"); - _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1; - - for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++) - ctypes[i-1] = TfUtils.ColumnTypes.DUMMYCODED; - - sum += _domainSizes[idx]; - idx++; - } - else - { - br.write(colID + TfUtils.TXMTD_SEP + "0" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + sum + "\n"); - _dcdColumnMap[colID-1] = sum-1; - - if ( agents.getBinAgent().isApplicable(colID) != -1 ) - ctypes[sum-1] = TfUtils.ColumnTypes.ORDINAL; // binned variable results in an ordinal column - - if ( agents.getRecodeAgent().isApplicable(colID) != -1 ) - ctypes[sum-1] = TfUtils.ColumnTypes.NOMINAL; - - sum += 1; - } - } - } - - // Write coltypes.csv - try(BufferedWriter br=new BufferedWriter(new OutputStreamWriter( - fs.create(new Path(txMtdDir + File.separator + TfUtils.TXMTD_COLTYPES),true))) ) { - br.write(ctypes[0].toID() + ""); - for(int i = 1; i < _dummycodedLength; i++) - br.write( TfUtils.TXMTD_SEP + ctypes[i].toID() ); - } - - return sum-1; - } - - /** - * Given a dummycoded column id, find the corresponding original column ID. - * - * @param colID dummycoded column ID - * @return original column ID, -1 if not found - */ - public int mapDcdColumnID(int colID) - { - for(int i=0; i < _dcdColumnMap.length; i++) - { - int st = (i==0 ? 1 : _dcdColumnMap[i-1]+1+1); - int end = _dcdColumnMap[i]+1; - //System.out.println((i+1) + ": " + "[" + st + "," + end + "]"); - - if ( colID >= st && colID <= end) - return i+1; - } - return -1; - } - - public String constructDummycodedHeader(String header, Pattern delim) { - - if(_colList == null && _binList == null ) - // none of the columns are dummycoded, simply return the given header - return header; - - String[] names = delim.split(header, -1); - List<String> newNames = null; - - StringBuilder sb = new StringBuilder(); - - // Dummycoding can be performed on either on a recoded column or on a binned column - - // process recoded columns - if(_finalMapsCP != null && _colList != null) - { - for(int i=0; i <_colList.length; i++) - { - int colID = _colList[i]; - HashMap<String,Long> map = _finalMapsCP.get(colID); - String colName = UtilFunctions.unquote(names[colID-1]); - - if ( map != null ) - { - // order map entries by their recodeID - List<Map.Entry<String, Long>> entryList = new ArrayList<Map.Entry<String, Long>>(map.entrySet()); - Comparator<Map.Entry<String, Long>> comp = new Comparator<Map.Entry<String, Long>>() { - @Override - public int compare(Entry<String, Long> entry1, Entry<String, Long> entry2) { - Long value1 = entry1.getValue(); - Long value2 = entry2.getValue(); - return (int) (value1 - value2); - } - }; - Collections.sort(entryList, comp); - newNames = new ArrayList<String>(); - for (Entry<String, Long> entry : entryList) { - newNames.add(entry.getKey()); - } - - // construct concatenated string of map entries - sb.setLength(0); - for(int idx=0; idx < newNames.size(); idx++) - { - if(idx==0) - sb.append( colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); - else - sb.append( delim + colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); - } - names[colID-1] = sb.toString(); // replace original column name with dcd name - } - } - } - else if(_finalMaps != null && _colList != null) { - for(int i=0; i <_colList.length; i++) { - int colID = _colList[i]; - HashMap<String,String> map = _finalMaps.get(colID); - String colName = UtilFunctions.unquote(names[colID-1]); - - if ( map != null ) - { - - // order map entries by their recodeID (represented as Strings .. "1", "2", etc.) - List<Map.Entry<String, String>> entryList = new ArrayList<Map.Entry<String, String>>(map.entrySet()); - Comparator<Map.Entry<String, String>> comp = new Comparator<Map.Entry<String, String>>() { - @Override - public int compare(Entry<String, String> entry1, Entry<String, String> entry2) { - String value1 = entry1.getValue(); - String value2 = entry2.getValue(); - return (Integer.parseInt(value1) - Integer.parseInt(value2)); - } - }; - Collections.sort(entryList, comp); - newNames = new ArrayList<String>(); - for (Entry<String, String> entry : entryList) { - newNames.add(entry.getKey()); - } - - // construct concatenated string of map entries - sb.setLength(0); - for(int idx=0; idx < newNames.size(); idx++) - { - if(idx==0) - sb.append( colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); - else - sb.append( delim + colName + TfUtils.DCD_NAME_SEP + newNames.get(idx)); - } - names[colID-1] = sb.toString(); // replace original column name with dcd name - } - } - } - - // process binned columns - if (_binList != null) - for(int i=0; i < _binList.length; i++) - { - int colID = _binList[i]; - - // need to consider only binned and dummycoded columns - if(isApplicable(colID) == -1) - continue; - - int numBins = _numBins[i]; - String colName = UtilFunctions.unquote(names[colID-1]); - - sb.setLength(0); - for(int idx=0; idx < numBins; idx++) - if(idx==0) - sb.append( colName + TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) ); - else - sb.append( delim + colName + TfUtils.DCD_NAME_SEP + "Bin" + (idx+1) ); - names[colID-1] = sb.toString(); // replace original column name with dcd name - } - - // Construct the full header - sb.setLength(0); - for(int colID=0; colID < names.length; colID++) - { - if (colID == 0) - sb.append(names[colID]); - else - sb.append(delim + names[colID]); - } - //System.out.println("DummycodedHeader: " + sb.toString()); - - return sb.toString(); - } - - @Override - public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, TfUtils agents) throws IOException { - if ( !isApplicable() ) { - _dummycodedLength = _clen; - return; - } - - // sort to-be dummycoded column IDs in ascending order. This is the order in which the new dummycoded record is constructed in apply() function. - Arrays.sort(_colList); - _domainSizes = new int[_colList.length]; - - _dummycodedLength = _clen; - - //HashMap<String, String> map = null; - for(int i=0; i<_colList.length; i++) { - int colID = _colList[i]; - - // Find the domain size for colID using _finalMaps or _finalMapsCP - int domainSize = 0; - if(_finalMaps != null) { - if(_finalMaps.get(colID) != null) - domainSize = _finalMaps.get(colID).size(); - } - else { - if(_finalMapsCP.get(colID) != null) - domainSize = _finalMapsCP.get(colID).size(); - } - - if ( domainSize != 0 ) { - // dummycoded column - _domainSizes[i] = domainSize; - } - else { - // binned column - if ( _binList != null ) - for(int j=0; j<_binList.length; j++) { - if (colID == _binList[j]) { - _domainSizes[i] = _numBins[j]; - break; - } - } - } - _dummycodedLength += _domainSizes[i]-1; - } - } - - - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - return apply(in, out); - } - - @Override - public void build(FrameBlock in) { - //do nothing - } - - /** - * Method to apply transformations. - * - * @param words array of strings - * @return array of transformed strings - */ - @Override - public String[] apply(String[] words) - { - if( !isApplicable() ) - return words; - - String[] nwords = new String[(int)_dummycodedLength]; - int rcdVal = 0; - - for(int colID=1, idx=0, ncolID=1; colID <= words.length; colID++) { - if(idx < _colList.length && colID==_colList[idx]) { - // dummycoded columns - try { - rcdVal = UtilFunctions.parseToInt(UtilFunctions.unquote(words[colID-1])); - nwords[ ncolID-1+rcdVal-1 ] = "1"; - ncolID += _domainSizes[idx]; - idx++; - } - catch (Exception e) { - throw new RuntimeException("Error in dummycoding: colID="+colID + ", rcdVal=" + rcdVal+", word="+words[colID-1] - + ", domainSize=" + _domainSizes[idx] + ", dummyCodedLength=" + _dummycodedLength); - } - } - else { - nwords[ncolID-1] = words[colID-1]; - ncolID++; - } - } - - return nwords; - } - - @Override - public MatrixBlock apply(FrameBlock in, MatrixBlock out) - { - MatrixBlock ret = new MatrixBlock(out.getNumRows(), (int)_dummycodedLength, false); - - for( int i=0; i<out.getNumRows(); i++ ) { - for(int colID=1, idx=0, ncolID=1; colID <= out.getNumColumns(); colID++) { - double val = out.quickGetValue(i, colID-1); - if(idx < _colList.length && colID==_colList[idx]) { - ret.quickSetValue(i, ncolID-1+(int)val-1, 1); - ncolID += _domainSizes[idx]; - idx++; - } - else { - double ptval = UtilFunctions.objectToDouble(in.getSchema()[colID-1], in.get(i, colID-1)); - ret.quickSetValue(i, ncolID-1, ptval); - ncolID++; - } - } - } - - return ret; - } - - @Override - public FrameBlock getMetaData(FrameBlock out) { - return out; - } - - @Override - public void initMetaData(FrameBlock meta) { - //initialize domain sizes and output num columns - _domainSizes = new int[_colList.length]; - _dummycodedLength = _clen; - for( int j=0; j<_colList.length; j++ ) { - int colID = _colList[j]; //1-based - _domainSizes[j] = (int)meta.getColumnMetadata()[colID-1].getNumDistinct(); - _dummycodedLength += _domainSizes[j]-1; - } - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java deleted file mode 100644 index 1e45036..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDMapper.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; - - -public class GTFMTDMapper implements Mapper<LongWritable, Text, IntWritable, DistinctValue>{ - - private OutputCollector<IntWritable, DistinctValue> _collector = null; - private int _mapTaskID = -1; - - TfUtils _agents = null; - - private boolean _partFileWithHeader = false; - private boolean _firstRecordInSplit = true; - private String _partFileName = null; - private long _offsetInPartFile = -1; - - // ---------------------------------------------------------------------------------------------- - - /** - * Configure the information used in the mapper, and setup transformation agents. - */ - @Override - public void configure(JobConf job) { - String[] parts = job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID).split("_"); - if ( parts[0].equalsIgnoreCase("task")) { - _mapTaskID = Integer.parseInt(parts[parts.length-1]); - } - else if ( parts[0].equalsIgnoreCase("attempt")) { - _mapTaskID = Integer.parseInt(parts[parts.length-2]); - } - else { - throw new RuntimeException("Unrecognized format for taskID: " + job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID)); - } - - try { - _partFileName = TfUtils.getPartFileName(job); - _partFileWithHeader = TfUtils.isPartFileWithHeader(job); - _agents = new TfUtils(job); - } catch(IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - - } - - - public void map(LongWritable rawKey, Text rawValue, OutputCollector<IntWritable, DistinctValue> out, Reporter reporter) throws IOException { - - if(_firstRecordInSplit) - { - _firstRecordInSplit = false; - _collector = out; - _offsetInPartFile = rawKey.get(); - } - - // ignore header - if (_agents.hasHeader() && rawKey.get() == 0 && _partFileWithHeader) - return; - - _agents.prepareTfMtd(rawValue.toString()); - } - - @Override - public void close() throws IOException { - _agents.getMVImputeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); - _agents.getRecodeAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); - _agents.getBinAgent().mapOutputTransformationMetadata(_collector, _mapTaskID, _agents); - - // Output part-file offsets to create OFFSETS_FILE, which is to be used in csv reblocking. - // OffsetCount is denoted as a DistinctValue by concatenating parfile name and offset within partfile. - if(_collector != null) { - IntWritable key = new IntWritable((int)_agents.getNumCols()+1); - DistinctValue val = new DistinctValue(new OffsetCount(_partFileName, _offsetInPartFile, _agents.getValid())); - _collector.collect(key, val); - } - - // reset global variables, required when the jvm is reused. - _firstRecordInSplit = true; - _offsetInPartFile = -1; - _partFileWithHeader = false; - } - -} http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java deleted file mode 100644 index 01fc784..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.wink.json4j.JSONException; -import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; - - -public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, LongWritable> { - - private JobConf _rJob = null; - TfUtils _agents = null; - - @Override - public void configure(JobConf job) { - _rJob = job; - - try { - String outputDir = MRJobConfiguration.getOutputs(job)[0]; - _agents = new TfUtils(job, outputDir); - } - catch(IOException e) { throw new RuntimeException(e); } - catch(JSONException e) { throw new RuntimeException(e); } - } - - @Override - public void close() throws IOException { - } - - @Override - public void reduce(IntWritable key, Iterator<DistinctValue> values, - OutputCollector<Text, LongWritable> output, Reporter reporter) - throws IOException { - - FileSystem fs = FileSystem.get(_rJob); - - int colID = key.get(); - - if(colID < 0) - { - // process mapper output for MV and Bin agents - colID = colID*-1; - _agents.getMVImputeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents); - } - else if ( colID == _agents.getNumCols() + 1) - { - // process mapper output for OFFSET_FILE - ArrayList<OffsetCount> list = new ArrayList<OffsetCount>(); - while(values.hasNext()) - list.add(new OffsetCount(values.next().getOffsetCount())); - - long numTfRows = generateOffsetsFile(list); - reporter.incrCounter(MRJobConfiguration.DataTransformCounters.TRANSFORMED_NUM_ROWS, numTfRows); - - } - else - { - // process mapper output for Recode agent - _agents.getRecodeAgent().mergeAndOutputTransformationMetadata(values, _agents.getTfMtdDir(), colID, fs, _agents); - } - - } - - @SuppressWarnings({"unchecked","deprecation"}) - private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException - { - Collections.sort(list); - - SequenceFile.Writer writer = null; - long lineOffset=0; - try { - writer = new SequenceFile.Writer( - FileSystem.get(_rJob), _rJob, - new Path(_agents.getOffsetFile()+"/part-00000"), - ByteWritable.class, OffsetCount.class); - - for(OffsetCount oc: list) - { - long count=oc.count; - oc.count=lineOffset; - writer.append(new ByteWritable((byte)0), oc); - lineOffset+=count; - } - } - finally { - IOUtilFunctions.closeSilently(writer); - } - list.clear(); - return lineOffset; - } - -} -
