[SYSTEMML-583] New transformmeta builtin (meta read util wrapper), tests Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e5aaaf1e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e5aaaf1e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e5aaaf1e
Branch: refs/heads/master Commit: e5aaaf1e82e1c856116365cbfcd85fcae4aa12fb Parents: 14e898a Author: Matthias Boehm <[email protected]> Authored: Fri Apr 22 23:03:08 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Apr 22 23:03:08 2016 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/api/jmlc/Connection.java | 184 +-------------- src/main/java/org/apache/sysml/hops/Hop.java | 5 +- .../sysml/hops/ParameterizedBuiltinOp.java | 16 +- .../apache/sysml/lops/ParameterizedBuiltin.java | 21 +- .../org/apache/sysml/parser/DMLTranslator.java | 7 + .../org/apache/sysml/parser/Expression.java | 2 +- .../ParameterizedBuiltinFunctionExpression.java | 26 ++ .../instructions/CPInstructionParser.java | 1 + .../cp/ParameterizedBuiltinCPInstruction.java | 23 +- .../runtime/transform/meta/TfMetaUtils.java | 236 +++++++++++++++++++ .../transform/TransformReadMetaTest.java | 168 +++++++++++++ .../functions/transform/TransformReadMeta.dml | 33 +++ .../transform/TransformReadMetaSpecX.json | 5 + .../functions/transform/ZPackageSuite.java | 1 + 14 files changed, 525 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/api/jmlc/Connection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java index 6a8dc45..54f7e44 100644 --- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java +++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java @@ -20,21 +20,13 @@ package org.apache.sysml.api.jmlc; import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.sysml.api.DMLException; @@ -46,12 +38,10 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.rewrite.ProgramRewriter; import org.apache.sysml.hops.rewrite.RewriteRemovePersistentReadWrite; -import org.apache.sysml.lops.Lop; import org.apache.sysml.parser.AParserWrapper; import org.apache.sysml.parser.DMLProgram; import org.apache.sysml.parser.DMLTranslator; import org.apache.sysml.parser.DataExpression; -import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.Program; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; @@ -65,13 +55,9 @@ import org.apache.sysml.runtime.io.ReaderTextCell; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.TfUtils; -import org.apache.sysml.runtime.transform.decode.DecoderRecode; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.DataConverter; -import org.apache.sysml.runtime.util.MapReduceTool; -import org.apache.sysml.runtime.util.UtilFunctions; -import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONObject; /** @@ -103,9 +89,7 @@ import org.apache.wink.json4j.JSONObject; * </ul> */ public class Connection -{ - private static final Log LOG = LogFactory.getLog(Connection.class.getName()); - +{ private DMLConfig _dmlconf = null; /** @@ -599,44 +583,8 @@ public class Connection * @return FrameBlock object representing transform metadata * @throws IOException */ - public FrameBlock readTransformMetaDataFromFile(String spec, String metapath, String colDelim) - throws IOException - { - //NOTE: this implementation assumes column alignment of colnames and coltypes - - //read column types (for sanity check column names) - String coltypesStr = MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLTYPES); - List<String> coltypes = Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP)); - - //read column names - String colnamesStr = MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLNAMES); - List<String> colnames = Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim)); - if( coltypes.size() != colnames.size() ) { - LOG.warn("Number of columns names: "+colnames.size()+" (expected: "+coltypes.size()+")."); - LOG.warn("--Sample column names: "+(!colnames.isEmpty()?colnames.get(0):"null")); - } - - //read meta data (currently only recode supported, without parsing spec) - Map<String,String> meta = new HashMap<String,String>(); - int rows = 0; - for( int j=0; j<colnames.size(); j++ ) { - String colName = colnames.get(j); - String name = metapath+File.separator+"Recode"+File.separator+colName; - if( MapReduceTool.existsFileOnHDFS(name+TfUtils.TXMTD_RCD_MAP_SUFFIX) ) { - meta.put(colName, MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_MAP_SUFFIX)); - String ndistinct = MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); - rows = Math.max(rows, Integer.parseInt(ndistinct)); - } - else if( coltypes.get(j).equals("2") ) { - LOG.warn("Recode map for column '"+colName+"' does not exist."); - } - } - - //get list of recode ids - List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes); - - //create frame block from in-memory strings - return convertToTransformMetaDataFrame(rows, recodeIDs, colnames, meta); + public FrameBlock readTransformMetaDataFromFile(String spec, String metapath, String colDelim) throws IOException { + return TfMetaUtils.readTransformMetaDataFromFile(spec, metapath, colDelim); } /** @@ -676,127 +624,7 @@ public class Connection * @return FrameBlock object representing transform metadata * @throws IOException */ - public FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) - throws IOException - { - //NOTE: this implementation assumes column alignment of colnames and coltypes - - //read column types (for sanity check column names) - String coltypesStr = IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLTYPES)); - List<String> coltypes = Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP)); - - //read column names - String colnamesStr = IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLNAMES)); - List<String> colnames = Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim)); - if( coltypes.size() != colnames.size() ) { - LOG.warn("Number of columns names: "+colnames.size()+" (expected: "+coltypes.size()+")."); - LOG.warn("--Sample column names: "+(!colnames.isEmpty()?colnames.get(0):"null")); - } - - //read meta data (currently only recode supported, without parsing spec) - Map<String,String> meta = new HashMap<String,String>(); - int rows = 0; - for( int j=0; j<colnames.size(); j++ ) { - String colName = colnames.get(j); - String name = metapath+"/"+"Recode"+"/"+colName; - String map = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_MAP_SUFFIX)); - if( map != null ) { - meta.put(colName, map); - String ndistinct = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX)); - rows = Math.max(rows, Integer.parseInt(ndistinct)); - } - else if( coltypes.get(j).equals("2") ) { - LOG.warn("Recode map for column '"+colName+"' does not exist."); - } - } - - //get list of recode ids - List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes); - - //create frame block from in-memory strings - return convertToTransformMetaDataFrame(rows, recodeIDs, colnames, meta); - } - - /** - * Converts transform meta data into an in-memory FrameBlock object. - * - * @param rows - * @param recodeIDs - * @param colnames - * @param meta - * @return - * @throws IOException - */ - private FrameBlock convertToTransformMetaDataFrame(int rows, List<Integer> recodeIDs, List<String> colnames, Map<String,String> meta) - throws IOException - { - //create frame block w/ pure string schema - List<ValueType> schema = Collections.nCopies(colnames.size(), ValueType.STRING); - FrameBlock ret = new FrameBlock(schema, colnames); - ret.ensureAllocatedColumns(rows); - - //encode recode maps into frame - for( Integer colID : recodeIDs ) { - String name = colnames.get(colID-1); - String map = meta.get(name); - if( map == null ) - throw new IOException("Recode map for column '"+name+"' (id="+colID+") not existing."); - - InputStream is = new ByteArrayInputStream(map.getBytes("UTF-8")); - BufferedReader br = new BufferedReader(new InputStreamReader(is)); - Pair<String,String> pair = new Pair<String,String>(); - String line; int rpos = 0; - while( (line = br.readLine()) != null ) { - DecoderRecode.parseRecodeMapEntry(line, pair); - String tmp = pair.getKey() + Lop.DATATYPE_PREFIX + pair.getValue(); - ret.set(rpos++, colID-1, tmp); - } - } - - return ret; - } - - /** - * Parses the given json specification and extracts a list of column ids - * that are subject to recoding. - * - * @param spec - * @param coltypes - * @return - * @throws IOException - */ - private ArrayList<Integer> parseRecodeColIDs(String spec, List<String> coltypes) - throws IOException - { - ArrayList<Integer> specRecodeIDs = new ArrayList<Integer>(); - - try { - if( spec != null ) { - //parse json transform specification for recode col ids - JSONObject jSpec = new JSONObject(spec); - if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { - JSONArray attrs = null; //TODO simplify once json spec consolidated - if( jSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { - JSONObject obj = (JSONObject) jSpec.get(TfUtils.TXMETHOD_RECODE); - attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - } - else - attrs = (JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE); - for(int j=0; j<attrs.length(); j++) - specRecodeIDs.add(UtilFunctions.toInt(attrs.get(j))); - } - } - else { - //obtain recode col ids from coltypes - for( int j=0; j<coltypes.size(); j++ ) - if( coltypes.get(j).equals("2") ) - specRecodeIDs.add(j+1); - } - } - catch(Exception ex) { - throw new IOException(ex); - } - - return specRecodeIDs; + public FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) throws IOException { + return TfMetaUtils.readTransformMetaDataFromPath(spec, metapath, colDelim); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index fa45ed3..f5b0fe6 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -1089,7 +1089,7 @@ public abstract class Hop public enum ParamBuiltinOp { INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, + TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA, }; /** @@ -1332,7 +1332,8 @@ public abstract class Hop HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REXPAND, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REXPAND); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORM, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMAPPLY, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMAPPLY); - HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMDECODE, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMDECODE); + HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMDECODE, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMDECODE); + HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMMETA, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMMETA); } protected static final HashMap<Hop.OpOp2, String> HopsOpOp2String; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java index 4a9eef9..9217c77 100644 --- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java +++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java @@ -197,7 +197,8 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop case INVCDF: case REPLACE: case TRANSFORMAPPLY: - case TRANSFORMDECODE: { + case TRANSFORMDECODE: + case TRANSFORMMETA: { ExecType et = optFindExecType(); ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops, HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et); @@ -994,11 +995,7 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop { if( _op == ParamBuiltinOp.TRANSFORM ) { // force remote, at runtime cp transform triggered for small files. - return REMOTE; - } - else if( _op == ParamBuiltinOp.TRANSFORMAPPLY - || _op == ParamBuiltinOp.TRANSFORMDECODE ) { - return ExecType.CP; + return (_etype = REMOTE); } if ( OptimizerUtils.isMemoryBasedOptLevel() ) { @@ -1018,6 +1015,13 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop checkAndSetInvalidCPDimsAndSize(); } + //force CP for in-memory only transform builtins + if( _op == ParamBuiltinOp.TRANSFORMAPPLY + || _op == ParamBuiltinOp.TRANSFORMDECODE + || _op == ParamBuiltinOp.TRANSFORMMETA ) { + _etype = ExecType.CP; + } + //mark for recompile (forever) if( ConfigurationManager.isDynamicRecompilation() && !dimsKnown(true) && _etype==REMOTE ) setRequiresRecompile(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java index d090a0a..0096de8 100644 --- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java +++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java @@ -41,7 +41,7 @@ public class ParameterizedBuiltin extends Lop public enum OperationTypes { INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, PEXP, QEXP, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, + TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA, }; private OperationTypes _operation; @@ -241,24 +241,15 @@ public class ParameterizedBuiltin extends Lop break; - case TRANSFORM: { - sb.append("transform"); + case TRANSFORM: + case TRANSFORMAPPLY: + case TRANSFORMDECODE: + case TRANSFORMMETA: { + sb.append(_operation.toString().toLowerCase()); //opcode sb.append(OPERAND_DELIMITOR); sb.append(compileGenericParamMap(_inputParams)); break; } - case TRANSFORMAPPLY: { - sb.append("transformapply"); - sb.append(OPERAND_DELIMITOR); - sb.append(compileGenericParamMap(_inputParams)); - break; - } - case TRANSFORMDECODE: { - sb.append("transformdecode"); - sb.append(OPERAND_DELIMITOR); - sb.append(compileGenericParamMap(_inputParams)); - break; - } default: throw new LopsException(this.printErrorLocation() + "In ParameterizedBuiltin Lop, Unknown operation: " + _operation); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/parser/DMLTranslator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java index 87ad226..b2b91c9 100644 --- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java @@ -2034,6 +2034,13 @@ public class DMLTranslator paramHops); break; + + case TRANSFORMMETA: + currBuiltinOp = new ParameterizedBuiltinOp( + target.getName(), target.getDataType(), + target.getValueType(), ParamBuiltinOp.TRANSFORMMETA, + paramHops); + break; default: http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/parser/Expression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/Expression.java b/src/main/java/org/apache/sysml/parser/Expression.java index 2351901..e080d76 100644 --- a/src/main/java/org/apache/sysml/parser/Expression.java +++ b/src/main/java/org/apache/sysml/parser/Expression.java @@ -126,7 +126,7 @@ public abstract class Expression GROUPEDAGG, RMEMPTY, REPLACE, ORDER, // Distribution Functions CDF, INVCDF, PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, PEXP, QEXP, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMENCODE, + TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMENCODE, TRANSFORMMETA, INVALID }; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java index e73a7c5..ef1745f 100644 --- a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java +++ b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java @@ -68,6 +68,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier opcodeMap.put("transformapply", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMAPPLY); opcodeMap.put("transformdecode", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMDECODE); opcodeMap.put("transformencode", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMENCODE); + opcodeMap.put("transformmeta", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMMETA); } public static HashMap<Expression.ParameterizedBuiltinFunctionOp, ParamBuiltinOp> pbHopMap; @@ -240,6 +241,10 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier validateTransformDecode(output, conditional); break; + case TRANSFORMMETA: + validateTransformMeta(output, conditional); + break; + default: //always unconditional (because unsupported operation) raiseValidateError("Unsupported parameterized function "+ getOpCode(), false, LanguageErrorCodes.INVALID_PARAMETERS); } @@ -373,6 +378,27 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier * @param conditional * @throws LanguageException */ + private void validateTransformMeta(DataIdentifier output, boolean conditional) + throws LanguageException + { + //validate specification + checkDataValueType("transformmeta", TF_FN_PARAM_SPEC, DataType.SCALAR, ValueType.STRING, conditional); + + //validate meta data path + checkDataValueType("transformmeta", TF_FN_PARAM_MTD, DataType.SCALAR, ValueType.STRING, conditional); + + //set output dimensions + output.setDataType(DataType.FRAME); + output.setValueType(ValueType.STRING); + output.setDimensions(-1, -1); + } + + /** + * + * @param output + * @param conditional + * @throws LanguageException + */ private void validateTransformEncode(DataIdentifier output1, DataIdentifier output2, boolean conditional) throws LanguageException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java index f3ac621..5c3f591 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java @@ -184,6 +184,7 @@ public class CPInstructionParser extends InstructionParser String2CPInstructionType.put( "transformapply",CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "transformdecode",CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "transformencode",CPINSTRUCTION_TYPE.MultiReturnParameterizedBuiltin); + String2CPInstructionType.put( "transformmeta",CPINSTRUCTION_TYPE.ParameterizedBuiltin); // Variable Instruction Opcodes String2CPInstructionType.put( "assignvar" , CPINSTRUCTION_TYPE.Variable); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java index 5f6ac5f..a96c2b6 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.instructions.cp; import java.util.HashMap; import org.apache.sysml.lops.Lop; +import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression; import org.apache.sysml.parser.Statement; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -38,8 +39,10 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.SimpleOperator; import org.apache.sysml.runtime.transform.DataTransform; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.decode.Decoder; import org.apache.sysml.runtime.transform.decode.DecoderFactory; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction @@ -124,7 +127,8 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction } else if ( opcode.equals("transform") || opcode.equals("transformapply") - || opcode.equals("transformdecode")) + || opcode.equals("transformdecode") + || opcode.equals("transformmeta")) { return new ParameterizedBuiltinCPInstruction(null, paramsMap, out, opcode, str); } @@ -266,6 +270,23 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction ec.releaseMatrixInput(params.get("target")); ec.releaseFrameInput(params.get("meta")); } + else if ( opcode.equalsIgnoreCase("transformmeta")) { + //get input spec and path + String spec = getParameterMap().get("spec"); + String path = getParameterMap().get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_MTD); + + //execute transform meta data read + FrameBlock meta = null; + try { + meta = TfMetaUtils.readTransformMetaDataFromFile(spec, path, TfUtils.TXMTD_SEP); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + + //release locks + ec.setFrameOutput(output.getName(), meta); + } else { throw new DMLRuntimeException("Unknown opcode : " + opcode); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java new file mode 100644 index 0000000..2536b23 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.meta; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysml.api.jmlc.Connection; +import org.apache.sysml.lops.Lop; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.io.IOUtilFunctions; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.decode.DecoderRecode; +import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.wink.json4j.JSONArray; +import org.apache.wink.json4j.JSONObject; + +public class TfMetaUtils +{ + private static final Log LOG = LogFactory.getLog(TfMetaUtils.class.getName()); + + /** + * Reads transform meta data from an HDFS file path and converts it into an in-memory + * FrameBlock object. + * + * @param spec transform specification as json string + * @param metapath hdfs file path to meta data directory + * @param colDelim separator for processing column names in the meta data file 'column.names' + * @return FrameBlock object representing transform metadata + * @throws IOException + */ + public static FrameBlock readTransformMetaDataFromFile(String spec, String metapath, String colDelim) + throws IOException + { + //NOTE: this implementation assumes column alignment of colnames and coltypes + + //read column types (for sanity check column names) + String coltypesStr = MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLTYPES); + List<String> coltypes = Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP)); + + //read column names + String colnamesStr = MapReduceTool.readStringFromHDFSFile(metapath+File.separator+TfUtils.TXMTD_COLNAMES); + List<String> colnames = Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim)); + if( coltypes.size() != colnames.size() ) { + LOG.warn("Number of columns names: "+colnames.size()+" (expected: "+coltypes.size()+")."); + LOG.warn("--Sample column names: "+(!colnames.isEmpty()?colnames.get(0):"null")); + } + + //read meta data (currently only recode supported, without parsing spec) + HashMap<String,String> meta = new HashMap<String,String>(); + int rows = 0; + for( int j=0; j<colnames.size(); j++ ) { + String colName = colnames.get(j); + String name = metapath+File.separator+"Recode"+File.separator+colName; + if( MapReduceTool.existsFileOnHDFS(name+TfUtils.TXMTD_RCD_MAP_SUFFIX) ) { + meta.put(colName, MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_MAP_SUFFIX)); + String ndistinct = MapReduceTool.readStringFromHDFSFile(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); + rows = Math.max(rows, Integer.parseInt(ndistinct)); + } + else if( coltypes.get(j).equals("2") ) { + LOG.warn("Recode map for column '"+colName+"' does not exist."); + } + } + + //get list of recode ids + List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes); + + //create frame block from in-memory strings + return convertToTransformMetaDataFrame(rows, recodeIDs, colnames, meta); + } + + /** + * Reads transform meta data from the class path and converts it into an in-memory + * FrameBlock object. + * + * @param spec transform specification as json string + * @param metapath resource path to meta data directory + * @param colDelim separator for processing column names in the meta data file 'column.names' + * @return FrameBlock object representing transform metadata + * @throws IOException + */ + public static FrameBlock readTransformMetaDataFromPath(String spec, String metapath, String colDelim) + throws IOException + { + //NOTE: this implementation assumes column alignment of colnames and coltypes + + //read column types (for sanity check column names) + String coltypesStr = IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLTYPES)); + List<String> coltypes = Arrays.asList(IOUtilFunctions.split(coltypesStr.trim(), TfUtils.TXMTD_SEP)); + + //read column names + String colnamesStr = IOUtilFunctions.toString(Connection.class.getResourceAsStream(metapath+"/"+TfUtils.TXMTD_COLNAMES)); + List<String> colnames = Arrays.asList(IOUtilFunctions.split(colnamesStr.trim(), colDelim)); + if( coltypes.size() != colnames.size() ) { + LOG.warn("Number of columns names: "+colnames.size()+" (expected: "+coltypes.size()+")."); + LOG.warn("--Sample column names: "+(!colnames.isEmpty()?colnames.get(0):"null")); + } + + //read meta data (currently only recode supported, without parsing spec) + HashMap<String,String> meta = new HashMap<String,String>(); + int rows = 0; + for( int j=0; j<colnames.size(); j++ ) { + String colName = colnames.get(j); + String name = metapath+"/"+"Recode"+"/"+colName; + String map = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_MAP_SUFFIX)); + if( map != null ) { + meta.put(colName, map); + String ndistinct = IOUtilFunctions.toString(Connection.class.getResourceAsStream(name+TfUtils.TXMTD_RCD_DISTINCT_SUFFIX)); + rows = Math.max(rows, Integer.parseInt(ndistinct)); + } + else if( coltypes.get(j).equals("2") ) { + LOG.warn("Recode map for column '"+colName+"' does not exist."); + } + } + + //get list of recode ids + List<Integer> recodeIDs = parseRecodeColIDs(spec, coltypes); + + //create frame block from in-memory strings + return convertToTransformMetaDataFrame(rows, recodeIDs, colnames, meta); + } + + /** + * Converts transform meta data into an in-memory FrameBlock object. + * + * @param rows + * @param recodeIDs + * @param colnames + * @param meta + * @return + * @throws IOException + */ + private static FrameBlock convertToTransformMetaDataFrame(int rows, List<Integer> recodeIDs, List<String> colnames, HashMap<String,String> meta) + throws IOException + { + //create frame block w/ pure string schema + List<ValueType> schema = Collections.nCopies(colnames.size(), ValueType.STRING); + FrameBlock ret = new FrameBlock(schema, colnames); + ret.ensureAllocatedColumns(rows); + + //encode recode maps into frame + for( Integer colID : recodeIDs ) { + String name = colnames.get(colID-1); + String map = meta.get(name); + if( map == null ) + throw new IOException("Recode map for column '"+name+"' (id="+colID+") not existing."); + + InputStream is = new ByteArrayInputStream(map.getBytes("UTF-8")); + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + Pair<String,String> pair = new Pair<String,String>(); + String line; int rpos = 0; + while( (line = br.readLine()) != null ) { + DecoderRecode.parseRecodeMapEntry(line, pair); + String tmp = pair.getKey() + Lop.DATATYPE_PREFIX + pair.getValue(); + ret.set(rpos++, colID-1, tmp); + } + } + + return ret; + } + + /** + * Parses the given json specification and extracts a list of column ids + * that are subject to recoding. + * + * @param spec + * @param coltypes + * @return + * @throws IOException + */ + private static ArrayList<Integer> parseRecodeColIDs(String spec, List<String> coltypes) + throws IOException + { + ArrayList<Integer> specRecodeIDs = new ArrayList<Integer>(); + + try { + if( spec != null ) { + //parse json transform specification for recode col ids + JSONObject jSpec = new JSONObject(spec); + if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { + JSONArray attrs = null; //TODO simplify once json spec consolidated + if( jSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { + JSONObject obj = (JSONObject) jSpec.get(TfUtils.TXMETHOD_RECODE); + attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); + } + else + attrs = (JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE); + for(int j=0; j<attrs.length(); j++) + specRecodeIDs.add(UtilFunctions.toInt(attrs.get(j))); + } + } + else { + //obtain recode col ids from coltypes + for( int j=0; j<coltypes.size(); j++ ) + if( coltypes.get(j).equals("2") ) + specRecodeIDs.add(j+1); + } + } + catch(Exception ex) { + throw new IOException(ex); + } + + return specRecodeIDs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformReadMetaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformReadMetaTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformReadMetaTest.java new file mode 100644 index 0000000..c0e09ba --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformReadMetaTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.transform; + +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.FrameReader; +import org.apache.sysml.runtime.io.FrameReaderFactory; +import org.apache.sysml.runtime.io.MatrixWriter; +import org.apache.sysml.runtime.io.MatrixWriterFactory; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.runtime.util.UtilFunctions; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +/** + * + * + */ +public class TransformReadMetaTest extends AutomatedTestBase +{ + private static final String TEST_NAME1 = "TransformReadMeta"; + private static final String TEST_DIR = "functions/transform/"; + private static final String TEST_CLASS_DIR = TEST_DIR + TransformReadMetaTest.class.getSimpleName() + "/"; + private static final String SPEC_X = "TransformReadMetaSpecX.json"; + + private static final int rows = 1432; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1,new String[]{"M1, M"})); + } + + @Test + public void runTestCsvCP() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv"); + } + + @Test + public void runTestCsvHadoop() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.HADOOP, "csv"); + } + + @Test + public void runTestCsvSpark() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.SPARK, "csv"); + } + + @Test + public void runTestTextCP() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.SINGLE_NODE, "text"); + } + + @Test + public void runTestTextHadoop() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.HADOOP, "text"); + } + + @Test + public void runTestTextSpark() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.SPARK, "text"); + } + + @Test + public void runTestBinaryCP() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.SINGLE_NODE, "binary"); + } + + @Test + public void runTestBinaryHadoop() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.HADOOP, "binary"); + } + + @Test + public void runTestBinarySpark() throws DMLRuntimeException, IOException { + runTransformReadMetaTest(RUNTIME_PLATFORM.SPARK, "binary"); + } + + + /** + * + * @param sparseM1 + * @param sparseM2 + * @param instType + * @throws IOException + * @throws DMLRuntimeException + */ + private void runTransformReadMetaTest( RUNTIME_PLATFORM rt, String ofmt) throws IOException, DMLRuntimeException + { + RUNTIME_PLATFORM platformOld = rtplatform; + rtplatform = rt; + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + try + { + getAndLoadTestConfiguration(TEST_NAME1); + + //generate input data + double[][] X = DataConverter.convertToDoubleMatrix( + MatrixBlock.seqOperations(0.5, rows/2, 0.5).appendOperations( + MatrixBlock.seqOperations(0.5, rows/2, 0.5), new MatrixBlock())); + MatrixBlock mbX = DataConverter.convertToMatrixBlock(X); + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(OutputInfo.CSVOutputInfo); + writer.writeMatrixToHDFS(mbX, input("X"), rows, 2, -1, -1, -1); + + //read specs transform X and Y + String specX = MapReduceTool.readStringFromHDFSFile(SCRIPT_DIR+TEST_DIR+SPEC_X); + + fullDMLScriptName = SCRIPT_DIR+TEST_DIR + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-args", input("X"), specX, output("M1"), output("M"), ofmt}; + + //run test + runTest(true, false, null, -1); + + //compare meta data frames + InputInfo iinfo = InputInfo.stringExternalToInputInfo(ofmt); + FrameReader reader = FrameReaderFactory.createFrameReader(iinfo); + FrameBlock mExpected = TfMetaUtils.readTransformMetaDataFromFile(specX, output("M1"), ","); + FrameBlock mRet = reader.readFrameFromHDFS(output("M"), rows, 2); + for( int i=0; i<rows; i++ ) + for( int j=0; j<2; j++ ) { + Assert.assertTrue("Wrong result: "+mRet.get(i, j)+".", + UtilFunctions.compareTo(ValueType.STRING, mExpected.get(i, j), mRet.get(i, j))==0); + } + } + catch(Exception ex) { + throw new IOException(ex); + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/test/scripts/functions/transform/TransformReadMeta.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/TransformReadMeta.dml b/src/test/scripts/functions/transform/TransformReadMeta.dml new file mode 100644 index 0000000..7328aa9 --- /dev/null +++ b/src/test/scripts/functions/transform/TransformReadMeta.dml @@ -0,0 +1,33 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +#transform +X = read($1, data_type="frame", format="csv"); +specX = $2; +R1 = transform(target = X, spec = specX, transformPath = $3); + +if( 1==1 ){} + +print(sum(R1)); + +#transform read meta data and write as frame +M = transformmeta(spec = specX, transformPath = $3); +write(M, $4, format=$5); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/test/scripts/functions/transform/TransformReadMetaSpecX.json ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/TransformReadMetaSpecX.json b/src/test/scripts/functions/transform/TransformReadMetaSpecX.json new file mode 100644 index 0000000..b5c9a84 --- /dev/null +++ b/src/test/scripts/functions/transform/TransformReadMetaSpecX.json @@ -0,0 +1,5 @@ +{ + "ids": true + ,"recode": [ 1, 2 ] + +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e5aaaf1e/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java index 0bbe7f3..2bd12b2 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java @@ -30,6 +30,7 @@ import org.junit.runners.Suite; ScalingTest.class, TransformAndApplyTest.class, TransformEncodeDecodeTest.class, + TransformReadMetaTest.class, TransformTest.class, })
