[SYSTEMML-569] Distributed spark transform encode/decode (recode, dummy) (1) Rework of internal encoder/decoder APIs in order to streamline meta data initialization and avoid redundancy.
(2) Extended cp transform encode/decode for dummycoding, including generalizations for input/output handling (e.g., pass-through encoder/decoder) as well as a new dummycoding decoder. (3) New spark instructions transformencode and transformdecode (compiler/runtime, so far only for recode and dummycode). (4) New testcase for transformencode/transformdecode over frames w/ different exec types. (5) Fixes for frame readers from HFDS and RDDs (schema handling), transform spec parsing (consistent sorted column id lists), and some jmlc tests (valid script/input meta data). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/293c81c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/293c81c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/293c81c6 Branch: refs/heads/master Commit: 293c81c6cae5dbd187b19e09806821f2f2f7db0d Parents: 7d4acdd Author: Matthias Boehm <[email protected]> Authored: Mon Jun 20 12:41:46 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Jun 23 17:56:40 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/FunctionOp.java | 36 +-- .../sysml/hops/ParameterizedBuiltinOp.java | 2 +- .../RewriteRemovePersistentReadWrite.java | 6 +- .../org/apache/sysml/lops/FunctionCallCP.java | 20 +- .../controlprogram/caching/FrameObject.java | 27 +- .../functionobjects/ParameterizedBuiltin.java | 11 +- .../instructions/SPInstructionParser.java | 6 + .../cp/ParameterizedBuiltinCPInstruction.java | 7 +- ...ReturnParameterizedBuiltinSPInstruction.java | 269 +++++++++++++++++++ .../ParameterizedBuiltinSPInstruction.java | 86 +++++- .../instructions/spark/SPInstruction.java | 2 +- .../apache/sysml/runtime/io/FrameReader.java | 20 +- .../sysml/runtime/matrix/data/FrameBlock.java | 12 + .../sysml/runtime/transform/BinAgent.java | 43 ++- .../sysml/runtime/transform/DataTransform.java | 23 -- .../sysml/runtime/transform/DummycodeAgent.java | 40 ++- .../sysml/runtime/transform/MVImputeAgent.java | 103 ++++--- .../sysml/runtime/transform/OmitAgent.java | 33 +-- .../sysml/runtime/transform/RecodeAgent.java | 174 ++++++------ .../sysml/runtime/transform/decode/Decoder.java | 27 +- .../transform/decode/DecoderComposite.java | 17 +- .../transform/decode/DecoderDummycode.java | 73 +++++ .../transform/decode/DecoderFactory.java | 63 ++--- .../transform/decode/DecoderPassThrough.java | 52 ++-- .../runtime/transform/decode/DecoderRecode.java | 73 ++--- .../sysml/runtime/transform/encode/Encoder.java | 41 ++- .../transform/encode/EncoderComposite.java | 57 ++-- .../transform/encode/EncoderFactory.java | 32 +-- .../transform/encode/EncoderPassThrough.java | 33 +-- .../runtime/transform/meta/TfMetaUtils.java | 5 +- .../functions/jmlc/FrameReadMetaTest.java | 4 +- .../transform/TransformFrameApplyTest.java | 194 +++++++++++++ .../TransformFrameEncodeDecodeTest.java | 146 ++++++++++ .../functions/transform/TransformFrameTest.java | 194 ------------- .../org/apache/sysml/test/utils/TestUtils.java | 21 ++ .../jmlc/tfmtd_example2/Recode/district.map | 4 + .../jmlc/tfmtd_example2/Recode/district.mode | 1 + .../tfmtd_example2/Recode/district.ndistinct | 1 + .../jmlc/tfmtd_example2/Recode/floors.map | 3 + .../jmlc/tfmtd_example2/Recode/floors.mode | 1 + .../jmlc/tfmtd_example2/Recode/floors.ndistinct | 1 + .../jmlc/tfmtd_example2/Recode/numbathrooms.map | 5 + .../tfmtd_example2/Recode/numbathrooms.mode | 1 + .../Recode/numbathrooms.ndistinct | 1 + .../jmlc/tfmtd_example2/Recode/numbedrooms.map | 7 + .../jmlc/tfmtd_example2/Recode/numbedrooms.mode | 1 + .../tfmtd_example2/Recode/numbedrooms.ndistinct | 1 + .../jmlc/tfmtd_example2/Recode/view.map | 2 + .../jmlc/tfmtd_example2/Recode/view.mode | 1 + .../jmlc/tfmtd_example2/Recode/view.ndistinct | 1 + .../jmlc/tfmtd_example2/Recode/zipcode.map | 5 + .../jmlc/tfmtd_example2/Recode/zipcode.mode | 1 + .../tfmtd_example2/Recode/zipcode.ndistinct | 1 + .../functions/jmlc/tfmtd_example2/coltypes.csv | 1 + .../functions/jmlc/tfmtd_example2/column.names | 1 + .../functions/jmlc/tfmtd_example2/spec.json | 1 + src/test/scripts/functions/jmlc/transform5.dml | 1 + .../transform/TransformFrameEncodeDecode.dml | 32 +++ .../functions/transform/ZPackageSuite.java | 3 +- 59 files changed, 1337 insertions(+), 692 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/hops/FunctionOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/FunctionOp.java b/src/main/java/org/apache/sysml/hops/FunctionOp.java index ae199b5..03db4da 100644 --- a/src/main/java/org/apache/sysml/hops/FunctionOp.java +++ b/src/main/java/org/apache/sysml/hops/FunctionOp.java @@ -214,16 +214,13 @@ public class FunctionOp extends Hop ExecType et = optFindExecType(); - if ( et != ExecType.CP ) { - throw new HopsException("Invalid execution type for function: " + _fname); - } //construct input lops (recursive) ArrayList<Lop> tmp = new ArrayList<Lop>(); for( Hop in : getInput() ) tmp.add( in.constructLops() ); //construct function call - FunctionCallCP fcall = new FunctionCallCP( tmp, _fnamespace, _fname, _outputs, _outputHops ); + FunctionCallCP fcall = new FunctionCallCP( tmp, _fnamespace, _fname, _outputs, _outputHops, et ); setLineNumbers( fcall ); setLops( fcall ); @@ -242,23 +239,30 @@ public class FunctionOp extends Hop protected ExecType optFindExecType() throws HopsException { + checkAndSetForcedPlatform(); + if ( getFunctionType() == FunctionType.MULTIRETURN_BUILTIN ) { - // Since the memory estimate is only conservative, do not throw - // exception if the estimated memory is larger than the budget - // Nevertheless, memory estimates these functions are useful for - // other purposes, such as compiling parfor - return ExecType.CP; // check if there is sufficient memory to execute this function - /*if ( getMemEstimate() < OptimizerUtils.getMemBudget(true) ) { - return ExecType.CP; - } + if( getFunctionName().equalsIgnoreCase("transformencode") ) { + _etype = ((_etypeForced==ExecType.SPARK + || (getMemEstimate() >= OptimizerUtils.getLocalMemBudget() + && OptimizerUtils.isSparkExecutionMode())) ? ExecType.SPARK : ExecType.CP); + } else { - throw new HopsException("Insufficient memory to execute function: " + getFunctionName()); - }*/ + // Since the memory estimate is only conservative, do not throw + // exception if the estimated memory is larger than the budget + // Nevertheless, memory estimates these functions are useful for + // other purposes, such as compiling parfor + _etype = ExecType.CP; + } } - // the actual function call is always CP - return ExecType.CP; + else { + // the actual function call is always CP + _etype = ExecType.CP; + } + + return _etype; } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java index 02d3715..f1ca98c 100644 --- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java +++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java @@ -1121,7 +1121,7 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop //force CP for in-memory only transform builtins if( (_op == ParamBuiltinOp.TRANSFORMAPPLY && REMOTE==ExecType.MR) - || _op == ParamBuiltinOp.TRANSFORMDECODE + || _op == ParamBuiltinOp.TRANSFORMDECODE && REMOTE==ExecType.MR || _op == ParamBuiltinOp.TRANSFORMMETA || _op == ParamBuiltinOp.TOSTRING) { _etype = ExecType.CP; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java index 89812f7..74db35a 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java @@ -110,15 +110,17 @@ public class RewriteRemovePersistentReadWrite extends HopRewriteRule if (hop.getDataType() == DataType.SCALAR) { dop.removeInput("iofilename"); } - } else + } + else LOG.warn("Non-registered persistent read of variable '"+dop.getName()+"' (line "+dop.getBeginLine()+")."); break; case PERSISTENTWRITE: - if( _outputs.contains(dop.getName()) ) + if( _outputs.contains(dop.getName()) ) { dop.setDataOpType(DataOpTypes.TRANSIENTWRITE); if (hop.getDataType() == DataType.SCALAR) { dop.removeInput("iofilename"); } + } else LOG.warn("Non-registered persistent write of variable '"+dop.getName()+"' (line "+dop.getBeginLine()+")."); break; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/lops/FunctionCallCP.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/FunctionCallCP.java b/src/main/java/org/apache/sysml/lops/FunctionCallCP.java index 9cbc9c1..cea01af 100644 --- a/src/main/java/org/apache/sysml/lops/FunctionCallCP.java +++ b/src/main/java/org/apache/sysml/lops/FunctionCallCP.java @@ -42,17 +42,18 @@ public class FunctionCallCP extends Lop private String[] _outputs; private ArrayList<Lop> _outputLops = null; - public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] outputs, ArrayList<Hop> outputHops) throws HopsException, LopsException { - this(inputs, fnamespace, fname, outputs); + public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] outputs, ArrayList<Hop> outputHops, ExecType et) + throws HopsException, LopsException + { + this(inputs, fnamespace, fname, outputs, et); if(outputHops != null) { _outputLops = new ArrayList<Lop>(); - for(Hop h : outputHops) { + for(Hop h : outputHops) _outputLops.add( h.constructLops() ); - } } } - public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] outputs) + public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String fname, String[] outputs, ExecType et) { super(Lop.Type.FunctionCallCP, DataType.UNKNOWN, ValueType.UNKNOWN); //note: data scalar in order to prevent generation of redundant createvar, rmvar @@ -62,8 +63,7 @@ public class FunctionCallCP extends Lop _outputs = outputs; //wire inputs - for( Lop in : inputs ) - { + for( Lop in : inputs ) { addInput( in ); in.addOutput( this ); } @@ -73,7 +73,7 @@ public class FunctionCallCP extends Lop boolean aligner = false; boolean definesMRJob = false; lps.addCompatibility(JobType.INVALID); - this.lps.setProperties(inputs, ExecType.CP, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob ); + lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob ); } public ArrayList<Lop> getFunctionOutputs() { @@ -87,7 +87,7 @@ public class FunctionCallCP extends Lop private String getInstructionsMultipleReturnBuiltins(String[] inputs, String[] outputs) { StringBuilder sb = new StringBuilder(); - sb.append("CP"); + sb.append(getExecType()); sb.append(Lop.OPERAND_DELIMITOR); sb.append(_fname.toLowerCase()); @@ -123,8 +123,8 @@ public class FunctionCallCP extends Lop //NOTE: we have to append full input operand information to distinguish literals from variables w/ equal name StringBuilder inst = new StringBuilder(); + inst.append(getExecType()); - inst.append("CP"); inst.append(Lop.OPERAND_DELIMITOR); inst.append("extfunct"); inst.append(Lop.OPERAND_DELIMITOR); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index 04d1543..db98a3e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -177,14 +177,6 @@ public class FrameObject extends CacheableData<FrameBlock> return data; } - /** - * Read Frame object from RDD - * - * @param rdd - * @param status - * - * @param fo - */ @Override protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status) throws IOException @@ -198,26 +190,27 @@ public class FrameObject extends CacheableData<FrameBlock> MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; MatrixCharacteristics mc = iimd.getMatrixCharacteristics(); + int rlen = (int)mc.getRows(); + int clen = (int)mc.getCols(); + + //handle missing schema if necessary + List<ValueType> lschema = (_schema!=null) ? _schema : + Collections.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING); FrameBlock fb = null; - try - { + try { //prevent unnecessary collect through rdd checkpoint if( rdd.allowsShortCircuitCollect() ) { lrdd = (RDDObject)rdd.getLineageChilds().get(0); } - //obtain frame block from RDD - int rlen = (int)mc.getRows(); - int clen = (int)mc.getCols(); - - //collect frame block from binary cell RDD - fb = SparkExecutionContext.toFrameBlock(lrdd, _schema, rlen, clen); + //collect frame block from binary block RDD + fb = SparkExecutionContext.toFrameBlock(lrdd, lschema, rlen, clen); } catch(DMLRuntimeException ex) { throw new IOException(ex); } - + //sanity check correct output if( fb == null ) { throw new IOException("Unable to load frame from rdd: "+lrdd.getVarName()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java index 2bd19fa..c542328 100644 --- a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java +++ b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java @@ -44,8 +44,11 @@ public class ParameterizedBuiltin extends ValueFunction private static final long serialVersionUID = -5966242955816522697L; - public enum ParameterizedBuiltinCode { INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, TRANSFORM, TRANSFORMAPPLY }; - public enum ProbabilityDistributionCode { INVALID, NORMAL, EXP, CHISQ, F, T }; + public enum ParameterizedBuiltinCode { + INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, + TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE }; + public enum ProbabilityDistributionCode { + INVALID, NORMAL, EXP, CHISQ, F, T }; public ParameterizedBuiltinCode bFunc; public ProbabilityDistributionCode distFunc; @@ -61,6 +64,7 @@ public class ParameterizedBuiltin extends ValueFunction String2ParameterizedBuiltinCode.put( "rexpand", ParameterizedBuiltinCode.REXPAND); String2ParameterizedBuiltinCode.put( "transform", ParameterizedBuiltinCode.TRANSFORM); String2ParameterizedBuiltinCode.put( "transformapply", ParameterizedBuiltinCode.TRANSFORMAPPLY); + String2ParameterizedBuiltinCode.put( "transformdecode", ParameterizedBuiltinCode.TRANSFORMDECODE); } static public HashMap<String, ProbabilityDistributionCode> String2DistCode; @@ -170,6 +174,9 @@ public class ParameterizedBuiltin extends ValueFunction case TRANSFORMAPPLY: return new ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMAPPLY); + + case TRANSFORMDECODE: + return new ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMDECODE); default: throw new DMLRuntimeException("Invalid parameterized builtin code: " + code); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index 663e6b4..e0c4631 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -56,6 +56,7 @@ import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction; import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction; +import org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction; import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction; import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction; import org.apache.sysml.runtime.instructions.spark.PmmSPInstruction; @@ -213,6 +214,8 @@ public class SPInstructionParser extends InstructionParser String2SPInstructionType.put( "rexpand" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "transform" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "transformapply",SPINSTRUCTION_TYPE.ParameterizedBuiltin); + String2SPInstructionType.put( "transformdecode",SPINSTRUCTION_TYPE.ParameterizedBuiltin); + String2SPInstructionType.put( "transformencode",SPINSTRUCTION_TYPE.MultiReturnBuiltin); String2SPInstructionType.put( "mappend", SPINSTRUCTION_TYPE.MAppend); String2SPInstructionType.put( "rappend", SPINSTRUCTION_TYPE.RAppend); @@ -367,6 +370,9 @@ public class SPInstructionParser extends InstructionParser case ParameterizedBuiltin: return ParameterizedBuiltinSPInstruction.parseInstruction(str); + case MultiReturnBuiltin: + return MultiReturnParameterizedBuiltinSPInstruction.parseInstruction(str); + case MatrixReshape: return MatrixReshapeSPInstruction.parseInstruction(str); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java index e6a536d..eca1627 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java @@ -43,6 +43,8 @@ import org.apache.sysml.runtime.transform.DataTransform; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.decode.Decoder; import org.apache.sysml.runtime.transform.decode.DecoderFactory; +import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.encode.EncoderFactory; import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.DataConverter; @@ -269,7 +271,8 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction FrameBlock meta = ec.getFrameInput(params.get("meta")); //compute transformapply - MatrixBlock mbout = DataTransform.cpDataTransform(getParameterMap(), data, meta ); + Encoder encoder = EncoderFactory.createEncoder(params.get("spec"), data.getNumColumns(), meta); + MatrixBlock mbout = encoder.apply(data, new MatrixBlock(data.getNumRows(), data.getNumColumns(), false)); //release locks ec.setMatrixOutput(output.getName(), mbout); @@ -283,7 +286,7 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction //compute transformdecode Decoder decoder = DecoderFactory.createDecoder(getParameterMap().get("spec"), null, meta); - FrameBlock fbout = decoder.decode(data, new FrameBlock(data.getNumColumns(), ValueType.STRING)); + FrameBlock fbout = decoder.decode(data, new FrameBlock(meta.getNumColumns(), ValueType.STRING)); //release locks ec.setFrameOutput(output.getName(), fbout); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java new file mode 100644 index 0000000..fc9e9ce --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.instructions.spark; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.caching.FrameObject; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyFunction; +import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyOffsetFunction; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; +import org.apache.sysml.runtime.io.FrameReader; +import org.apache.sysml.runtime.io.FrameReaderFactory; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.transform.RecodeAgent; +import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.encode.EncoderComposite; +import org.apache.sysml.runtime.transform.encode.EncoderFactory; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; +import org.apache.sysml.runtime.transform.meta.TfOffsetMap; + +import scala.Tuple2; + + +public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPInstruction +{ + protected ArrayList<CPOperand> _outputs; + + public MultiReturnParameterizedBuiltinSPInstruction(Operator op, CPOperand input1, CPOperand input2, ArrayList<CPOperand> outputs, String opcode, String istr ) { + super(op, input1, input2, outputs.get(0), opcode, istr); + _sptype = SPINSTRUCTION_TYPE.MultiReturnBuiltin; + _outputs = outputs; + } + + public CPOperand getOutput(int i) { + return _outputs.get(i); + } + + /** + * + * @param str + * @return + * @throws DMLRuntimeException + */ + public static MultiReturnParameterizedBuiltinSPInstruction parseInstruction( String str ) + throws DMLRuntimeException + { + String[] parts = InstructionUtils.getInstructionPartsWithValueType(str); + ArrayList<CPOperand> outputs = new ArrayList<CPOperand>(); + String opcode = parts[0]; + + if ( opcode.equalsIgnoreCase("transformencode") ) { + // one input and two outputs + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + outputs.add ( new CPOperand(parts[3], ValueType.DOUBLE, DataType.MATRIX) ); + outputs.add ( new CPOperand(parts[4], ValueType.STRING, DataType.FRAME) ); + return new MultiReturnParameterizedBuiltinSPInstruction(null, in1, in2, outputs, opcode, str); + } + else { + throw new DMLRuntimeException("Invalid opcode in MultiReturnBuiltin instruction: " + opcode); + } + + } + + @Override + @SuppressWarnings("unchecked") + public void processInstruction(ExecutionContext ec) + throws DMLRuntimeException + { + SparkExecutionContext sec = (SparkExecutionContext) ec; + + try + { + //get input RDD and meta data + FrameObject fo = sec.getFrameObject(input1.getName()); + FrameObject fometa = sec.getFrameObject(_outputs.get(1).getName()); + JavaPairRDD<Long,FrameBlock> in = (JavaPairRDD<Long,FrameBlock>) + sec.getRDDHandleForFrameObject(fo, InputInfo.BinaryBlockInputInfo); + String spec = ec.getScalarInput(input2.getName(), input2.getValueType(), input2.isLiteral()).getStringValue(); + MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + + //step 1: build transform meta data + Encoder encoderBuild = EncoderFactory.createEncoder(spec, + fo.getSchema(), (int)fo.getNumColumns(), null); + + Accumulator<Long> accMax = sec.getSparkContext().accumulator(0L, new MaxAcc()); + in.mapPartitionsToPair(new TransformEncodeBuildFunction(encoderBuild)) + .distinct().groupByKey() + .flatMap(new TransformEncodeGroupFunction(accMax)) + .saveAsTextFile(fometa.getFileName()); //trigger eval + + //reuse multi-threaded reader + FrameReader reader = FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo); + FrameBlock meta = reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), fo.getNumColumns()); + meta.recomputeColumnCardinality(); //recompute num distinct items per column + + //step 2: transform apply (similar to spark transformapply) + //compute omit offset map for block shifts + TfOffsetMap omap = null; + if( TfMetaUtils.containsOmitSpec(spec) ) { + omap = new TfOffsetMap(SparkUtils.toIndexedLong(in.mapToPair( + new RDDTransformApplyOffsetFunction(spec)).collect())); + } + + //create encoder broadcast (avoiding replication per task) + Encoder encoder = EncoderFactory.createEncoder(spec, + fo.getSchema(), (int)fo.getNumColumns(), meta); + mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), encoder.getNumCols()); + Broadcast<Encoder> bmeta = sec.getSparkContext().broadcast(encoder); + Broadcast<TfOffsetMap> bomap = (omap!=null) ? sec.getSparkContext().broadcast(omap) : null; + + //execute transform apply + JavaPairRDD<Long,FrameBlock> tmp = in + .mapToPair(new RDDTransformApplyFunction(bmeta, bomap)); + JavaPairRDD<MatrixIndexes,MatrixBlock> out = FrameRDDConverterUtils + .binaryBlockToMatrixBlock(tmp, mcOut, mcOut); + + //set output and maintain lineage/output characteristics + sec.setRDDHandleForVariable(_outputs.get(0).getName(), out); + sec.addLineageRDD(_outputs.get(0).getName(), input1.getName()); + sec.setFrameOutput(_outputs.get(1).getName(), meta); + } + catch(IOException ex) { + throw new RuntimeException(ex); + } + } + + /** + * + */ + public static class TransformEncodeBuildFunction implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String> + { + private static final long serialVersionUID = 6336375833412029279L; + + private Encoder _encoder = null; + + public TransformEncodeBuildFunction(Encoder encoder) { + _encoder = encoder; + } + + @Override + public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<Long, FrameBlock>> iter) + throws Exception + { + //build meta data (e.g., recode maps) + while( iter.hasNext() ) { + _encoder.build(iter.next()._2()); + } + + //output recode maps as columnID - token pairs + ArrayList<Tuple2<Integer,String>> ret = new ArrayList<Tuple2<Integer,String>>(); + if( _encoder instanceof EncoderComposite ) + for( Encoder cEncoder : ((EncoderComposite)_encoder).getEncoders() ) + if( cEncoder instanceof RecodeAgent ) { + RecodeAgent ra = (RecodeAgent) cEncoder; + HashMap<Integer,HashMap<String,Long>> tmp = ra.getCPRecodeMaps(); + for( Entry<Integer,HashMap<String,Long>> e1 : tmp.entrySet() ) + for( String token : e1.getValue().keySet() ) + ret.add(new Tuple2<Integer,String>(e1.getKey(), token)); + } + + return ret; + } + } + + /** + * + */ + public static class TransformEncodeGroupFunction implements FlatMapFunction<Tuple2<Integer, Iterable<String>>, String> + { + private static final long serialVersionUID = -1034187226023517119L; + + private Accumulator<Long> _accMax = null; + + public TransformEncodeGroupFunction( Accumulator<Long> accMax ) { + _accMax = accMax; + } + + @Override + public Iterable<String> call(Tuple2<Integer, Iterable<String>> arg0) + throws Exception + { + String colID = String.valueOf(arg0._1()); + Iterator<String> iter = arg0._2().iterator(); + + ArrayList<String> ret = new ArrayList<String>(); + StringBuilder sb = new StringBuilder(); + long rowID = 1; + while( iter.hasNext() ) { + sb.append(rowID); + sb.append(' '); + sb.append(colID); + sb.append(' '); + sb.append(RecodeAgent.constructRecodeMapEntry(iter.next(), rowID)); + ret.add(sb.toString()); + sb.setLength(0); + rowID++; + } + _accMax.add(rowID-1); + + return ret; + } + } + + /** + * + */ + private static class MaxAcc implements AccumulatorParam<Long>, Serializable + { + private static final long serialVersionUID = -3739727823287550826L; + + @Override + public Long addInPlace(Long arg0, Long arg1) { + return Math.max(arg0, arg1); + } + + @Override + public Long zero(Long arg0) { + return arg0; + } + + @Override + public Long addAccumulator(Long arg0, Long arg1) { + return Math.max(arg0, arg1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index a80dd1b..25a4078 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -37,6 +37,7 @@ import org.apache.sysml.parser.Statement; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.functionobjects.KahanPlus; @@ -72,6 +73,8 @@ import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.SimpleOperator; import org.apache.sysml.runtime.transform.DataTransform; import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.decode.Decoder; +import org.apache.sysml.runtime.transform.decode.DecoderFactory; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.transform.encode.EncoderFactory; import org.apache.sysml.runtime.transform.meta.TfMetaUtils; @@ -169,7 +172,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction else if( opcode.equalsIgnoreCase("rexpand") || opcode.equalsIgnoreCase("replace") || opcode.equalsIgnoreCase("transform") - || opcode.equalsIgnoreCase("transformapply")) + || opcode.equalsIgnoreCase("transformapply") + || opcode.equalsIgnoreCase("transformdecode")) { func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode); return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false); @@ -450,6 +454,30 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction sec.addLineageRDD(output.getName(), params.get("target")); ec.releaseFrameInput(params.get("meta")); } + else if ( opcode.equalsIgnoreCase("transformdecode") ) + { + //get input RDD and meta data + JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(params.get("target")); + MatrixCharacteristics mc = sec.getMatrixCharacteristics(params.get("target")); + FrameBlock meta = sec.getFrameInput(params.get("meta")); + + //reblock if necessary (clen > bclen) + if( mc.getCols() > mc.getNumColBlocks() ) { + in = in.mapToPair(new RDDTransformDecodeExpandFunction( + (int)mc.getCols(), mc.getColsPerBlock())); + in = RDDAggregateUtils.mergeByKey(in); + } + + //construct decoder and decode individual matrix blocks + Decoder decoder = DecoderFactory.createDecoder(params.get("spec"), null, meta); + JavaPairRDD<Long,FrameBlock> out = in.mapToPair( + new RDDTransformDecodeFunction(decoder, meta.getNumColumns(), mc.getRowsPerBlock())); + + //set output and maintain lineage/output characteristics + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), params.get("target")); + ec.releaseFrameInput(params.get("meta")); + } else { throw new DMLRuntimeException("Unknown parameterized builtin opcode: "+opcode); } @@ -776,6 +804,62 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction /** * + */ + public static class RDDTransformDecodeFunction implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock> + { + private static final long serialVersionUID = -4797324742568170756L; + + private Decoder _decoder = null; + private int _clen = -1; + private int _brlen = -1; + + public RDDTransformDecodeFunction(Decoder decoder, int clen, int brlen) { + _decoder = decoder; + _clen = clen; + _brlen = brlen; + } + + @Override + public Tuple2<Long,FrameBlock> call(Tuple2<MatrixIndexes, MatrixBlock> in) + throws Exception + { + long rix = UtilFunctions.computeCellIndex(in._1().getRowIndex(), _brlen, 0); + return new Tuple2<Long, FrameBlock>(rix, + _decoder.decode(in._2(), new FrameBlock(_clen, ValueType.STRING))); + } + } + + public static class RDDTransformDecodeExpandFunction implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,MatrixIndexes,MatrixBlock> + { + private static final long serialVersionUID = -8187400248076127598L; + + private int _clen = -1; + private int _bclen = -1; + + public RDDTransformDecodeExpandFunction(int clen, int bclen) { + _clen = clen; + _bclen = bclen; + } + + @Override + public Tuple2<MatrixIndexes,MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> in) + throws Exception + { + MatrixIndexes inIx = in._1(); + MatrixBlock inBlk = in._2(); + + //construct expanded block via leftindexing + int cl = (int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 0)-1; + int cu = (int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, inBlk.getNumColumns()-1)-1; + MatrixBlock out = new MatrixBlock(inBlk.getNumRows(), _clen, false); + out = out.leftIndexingOperations(inBlk, 0, inBlk.getNumRows()-1, cl, cu, null, UpdateType.INPLACE_PINNED); + + return new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(inIx.getRowIndex(), 1), out); + } + } + + /** + * * @param mc1 * @param mcOut * @param out http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java index acc2ff6..d9c49aa 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java @@ -40,7 +40,7 @@ public abstract class SPInstruction extends Instruction public enum SPINSTRUCTION_TYPE { MAPMM, MAPMMCHAIN, CPMM, RMM, TSMM, PMM, ZIPMM, PMAPMM, //matrix multiplication instructions MatrixIndexing, Reorg, ArithmeticBinary, RelationalBinary, AggregateUnary, AggregateTernary, Reblock, CSVReblock, - Builtin, BuiltinUnary, BuiltinBinary, Checkpoint, Cast, + Builtin, BuiltinUnary, BuiltinBinary, MultiReturnBuiltin, Checkpoint, Cast, CentralMoment, Covariance, QSort, QPick, ParameterizedBuiltin, MAppend, RAppend, GAppend, GAlignedAppend, Rand, MatrixReshape, Ternary, Quaternary, CumsumAggregate, CumsumOffset, BinUaggChain, UaggOuterChain, http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java index 6a4b469..d37bbde 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java @@ -53,8 +53,7 @@ public abstract class FrameReader * @param clen * @return */ - public abstract FrameBlock readFrameFromHDFS( String fname, List<ValueType> schema, List<String> names, - long rlen, long clen) + public abstract FrameBlock readFrameFromHDFS( String fname, List<ValueType> schema, List<String> names, long rlen, long clen) throws IOException, DMLRuntimeException; /** @@ -89,13 +88,11 @@ public abstract class FrameReader * @param iNumColumns * @return */ - public List<ValueType> getDefSchema( long lNumColumns ) + public List<ValueType> getDefSchema( long clen ) throws IOException, DMLRuntimeException { - List<ValueType> schema = new ArrayList<ValueType>(); - for (int i=0; i < lNumColumns; ++i) - schema.add(ValueType.STRING); - return schema; + int lclen = Math.max((int)clen, 1); + return Collections.nCopies(lclen, ValueType.STRING); } /** @@ -103,11 +100,11 @@ public abstract class FrameReader * @param iNumColumns * @return */ - public List<String> getDefColNames( long lNumColumns ) + public List<String> getDefColNames( long clen ) throws IOException, DMLRuntimeException { List<String> colNames = new ArrayList<String>(); - for (int i=0; i < lNumColumns; ++i) + for (int i=0; i < clen; ++i) colNames.add("C"+i); return colNames; } @@ -183,7 +180,7 @@ public abstract class FrameReader * @return */ protected static List<String> createOutputNames(List<String> names, long ncol) { - if( names.size()==1 && ncol > 1 ) + if( names.size() != ncol ) return FrameBlock.createColNames((int)ncol); return names; } @@ -203,7 +200,6 @@ public abstract class FrameReader //check for empty file if( MapReduceTool.isFileEmpty( fs, path.toString() ) ) - throw new EOFException("Empty input file "+ path.toString() +"."); - + throw new EOFException("Empty input file "+ path.toString() +"."); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index e51bd9d..e5c5fec 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -277,6 +277,18 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable return "C" + i; } + /** + * + */ + public void recomputeColumnCardinality() { + for( int j=0; j<getNumColumns(); j++ ) { + int card = 0; + for( int i=0; i<getNumRows(); i++ ) + card += (get(i, j) != null) ? 1 : 0; + _colmeta.get(j).setNumDistinct(card); + } + } + /////// // basic get and set functionality http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index 3ed9a1d..fe83627 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -315,6 +315,18 @@ public class BinAgent extends Encoder } } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + build(in); + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + // TODO Auto-generated method stub + } + /** * Method to apply transformations. * @@ -360,40 +372,15 @@ public class BinAgent extends Encoder } return out; } - - @Override - public double[] encode(String[] in, double[] out) { - // TODO Auto-generated method stub - return null; - } - - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - build(in); - return apply(in, out); - } - - @Override - public void build(String[] in) { - // TODO Auto-generated method stub - } @Override - public void build(FrameBlock in) { - // TODO Auto-generated method stub - } - - @Override - public FrameBlock getMetaData(FrameBlock out) { + public FrameBlock getMetaData(FrameBlock meta) { // TODO Auto-generated method stub return null; } - /** - * - * @param meta - */ - public void initBins(FrameBlock meta) { + @Override + public void initMetaData(FrameBlock meta) { _binMins = new double[_colList.length][]; _binMaxs = new double[_colList.length][]; for( int j=0; j<_colList.length; j++ ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java index c17b662..c1e1702 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java @@ -74,13 +74,10 @@ import org.apache.sysml.runtime.matrix.JobReturn; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FileFormatProperties; -import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; -import org.apache.sysml.runtime.transform.encode.Encoder; -import org.apache.sysml.runtime.transform.encode.EncoderFactory; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.utils.JSONHelper; @@ -1043,26 +1040,6 @@ public class DataTransform } /** - * Apply given transform metadata (incl recode maps) over an in-memory frame input in order to - * create a transformed numerical matrix. Note: The number of rows always remains unchanged, - * whereas the number of column might increase or decrease. - * - * @param params - * @param input - * @param meta - * @param spec - * @return - * @throws DMLRuntimeException - * @throws - */ - public static MatrixBlock cpDataTransform(HashMap<String,String> params, FrameBlock input, FrameBlock meta) - throws DMLRuntimeException - { - Encoder encoder = EncoderFactory.createEncoder(params.get("spec"), input.getNumColumns(), meta); - return encoder.apply(input, new MatrixBlock(input.getNumRows(), input.getNumColumns(), false)); - } - - /** * Helper function to fetch and sort the list of part files under the given * input directory. * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java index bcb06df..aeeb62b 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java @@ -377,10 +377,20 @@ public class DummycodeAgent extends Encoder } } _dummycodedLength += _domainSizes[i]-1; - //System.out.println("colID=" + colID + ", domainsize=" + _domainSizes[i] + ", dcdLength=" + _dummycodedLength); } } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + //do nothing + } + /** * Method to apply transformations. * @@ -444,39 +454,19 @@ public class DummycodeAgent extends Encoder } @Override - public double[] encode(String[] in, double[] out) { - //TODO - return null; - } - - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - return apply(in, out); - } - - @Override - public void build(String[] in) { - //do nothing - } - - @Override - public void build(FrameBlock in) { - //do nothing - } - - @Override public FrameBlock getMetaData(FrameBlock out) { - return null; + return out; } - public void initDomainSizes(FrameBlock meta) { + @Override + public void initMetaData(FrameBlock meta) { //initialize domain sizes and output num columns _domainSizes = new int[_colList.length]; _dummycodedLength = _clen; for( int j=0; j<_colList.length; j++ ) { int colID = _colList[j]; //1-based _domainSizes[j] = (int)meta.getColumnMetadata().get(colID-1).getNumDistinct(); - _dummycodedLength += _domainSizes[j]; + _dummycodedLength += _domainSizes[j]-1; } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java index 1aeb465..344693c 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java @@ -96,7 +96,7 @@ public class MVImputeAgent extends Encoder private String[] _replacementList = null; // replacements: for global_mean, mean; and for global_mode, recode id of mode category private String[] _NAstrings = null; - + private List<Integer> _rcList = null; public String[] getReplacements() { return _replacementList; } public KahanObject[] getMeans() { return _meanList; } @@ -890,12 +890,43 @@ public class MVImputeAgent extends Encoder } } - /** - * Method to apply transformations. - * - * @param words - * @return - */ + public MVMethod getMethod(int colID) { + int idx = isApplicable(colID); + if(idx == -1) + return MVMethod.INVALID; + + switch(_mvMethodList[idx]) + { + case 1: return MVMethod.GLOBAL_MEAN; + case 2: return MVMethod.GLOBAL_MODE; + case 3: return MVMethod.CONSTANT; + default: return MVMethod.INVALID; + } + + } + + public long getNonMVCount(int colID) { + int idx = isApplicable(colID); + return (idx == -1) ? 0 : _countList[idx]; + } + + public String getReplacement(int colID) { + int idx = isApplicable(colID); + return (idx == -1) ? null : _replacementList[idx]; + } + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void build(FrameBlock in) { + // TODO Auto-generated method stub + + } + @Override public String[] apply(String[] words) { @@ -926,32 +957,6 @@ public class MVImputeAgent extends Encoder return words; } - public MVMethod getMethod(int colID) { - int idx = isApplicable(colID); - if(idx == -1) - return MVMethod.INVALID; - - switch(_mvMethodList[idx]) - { - case 1: return MVMethod.GLOBAL_MEAN; - case 2: return MVMethod.GLOBAL_MODE; - case 3: return MVMethod.CONSTANT; - default: return MVMethod.INVALID; - } - - } - - public long getNonMVCount(int colID) { - int idx = isApplicable(colID); - return (idx == -1) ? 0 : _countList[idx]; - } - - public String getReplacement(int colID) { - int idx = isApplicable(colID); - return (idx == -1) ? null : _replacementList[idx]; - } - - @Override public MatrixBlock apply(FrameBlock in, MatrixBlock out) { for(int i=0; i<in.getNumRows(); i++) { @@ -965,26 +970,6 @@ public class MVImputeAgent extends Encoder } @Override - public double[] encode(String[] in, double[] out) { - // TODO Auto-generated method stub - return null; - } - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - // TODO Auto-generated method stub - return null; - } - @Override - public void build(String[] in) { - // TODO Auto-generated method stub - - } - @Override - public void build(FrameBlock in) { - // TODO Auto-generated method stub - - } - @Override public FrameBlock getMetaData(FrameBlock out) { // TODO Auto-generated method stub return null; @@ -995,14 +980,14 @@ public class MVImputeAgent extends Encoder * @param meta * @param rcList */ - public void initReplacementList(FrameBlock meta, List<Integer> rcList) { + public void initMetaData(FrameBlock meta) { //init replacement lists, replace recoded values to //apply mv imputation potentially after recoding _replacementList = new String[_colList.length]; for( int j=0; j<_colList.length; j++ ) { int colID = _colList[j]; String mvVal = UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); - if( rcList.contains(colID) ) { + if( _rcList.contains(colID) ) { Long mvVal2 = meta.getRecodeMap(colID-1).get(mvVal); if( mvVal2 == null) throw new RuntimeException("Missing recode value for impute value '"+mvVal+"'."); @@ -1013,4 +998,12 @@ public class MVImputeAgent extends Encoder } } } + + /** + * + * @param rcList + */ + public void initRecodeIDList(List<Integer> rcList) { + _rcList = rcList; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java index 539ff30..186ac7a 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java @@ -94,6 +94,16 @@ public class OmitAgent extends Encoder } @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + return apply(in, out); + } + + @Override + public void build(FrameBlock in) { + //do nothing + } + + @Override public String[] apply(String[] words) { return null; } @@ -133,29 +143,14 @@ public class OmitAgent extends Encoder } @Override - public double[] encode(String[] in, double[] out) { - return null; - } - - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - return apply(in, out); - } - - @Override - public void build(String[] in) { - //do nothing - } - - @Override - public void build(FrameBlock in) { + public FrameBlock getMetaData(FrameBlock out) { //do nothing + return out; } - + @Override - public FrameBlock getMetaData(FrameBlock out) { + public void initMetaData(FrameBlock meta) { //do nothing - return out; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java index a44ad5a..01d7c85 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java @@ -88,19 +88,6 @@ public class RecodeAgent extends Encoder } } - /** - * Construct the recodemaps from the given input frame for all - * columns registered for recode. - * - * @param frame - */ - public void initRecodeMaps( FrameBlock frame ) { - for( int j=0; j<_colList.length; j++ ) { - int colID = _colList[j]; //1-based - _rcdMaps.put(colID, frame.getRecodeMap(colID-1)); - } - } - public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { return _rcdMaps; } @@ -382,6 +369,56 @@ public class RecodeAgent extends Encoder } /** + * + * @param colID + * @param key + * @return + */ + private String lookupRCDMap(int colID, String key) { + if( _finalMaps!=null ) + return _finalMaps.get(colID).get(key); + else { //used for cp + Long tmp = _rcdMaps.get(colID).get(key); + return (tmp!=null) ? Long.toString(tmp) : null; + } + } + + + @Override + public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + if( !isApplicable() ) + return out; + + //build and apply recode maps + build(in); + apply(in, out); + + return out; + } + + @Override + public void build(FrameBlock in) { + if( !isApplicable() ) + return; + + Iterator<String[]> iter = in.getStringRowIterator(); + while( iter.hasNext() ) { + String[] row = iter.next(); + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + //allocate column map if necessary + if( !_rcdMaps.containsKey(colID) ) + _rcdMaps.put(colID, new HashMap<String,Long>()); + //probe and build column map + HashMap<String,Long> map = _rcdMaps.get(colID); + String key = row[colID-1]; + if( !map.containsKey(key) ) + map.put(key, new Long(map.size()+1)); + } + } + } + + /** * Method to apply transformations. * * @param words @@ -422,81 +459,11 @@ public class RecodeAgent extends Encoder return out; } - - /** - * - * @param colID - * @param key - * @return - */ - private String lookupRCDMap(int colID, String key) { - if( _finalMaps!=null ) - return _finalMaps.get(colID).get(key); - else { //used for cp - Long tmp = _rcdMaps.get(colID).get(key); - return (tmp!=null) ? Long.toString(tmp) : null; - } - } - - @Override - public double[] encode(String[] in, double[] out) { - if( !isApplicable() ) - return out; - - //build and apply recode maps - build(in); - apply(in); - - //convert to double - for( int j=0; j<_colList.length; j++ ) - out[_colList[j]-1] = Double.parseDouble(in[_colList[j]-1]); - return out; - } - - @Override - public MatrixBlock encode(FrameBlock in, MatrixBlock out) { - if( !isApplicable() ) - return out; - - //build and apply recode maps - build(in); - apply(in, out); - - return out; - } - - @Override - public void build(String[] in) { - if( !isApplicable() ) - return; - - for( int j=0; j<_colList.length; j++ ) { - int colID = _colList[j]; //1-based - //allocate column map if necessary - if( !_rcdMaps.containsKey(colID) ) - _rcdMaps.put(colID, new HashMap<String,Long>()); - //probe and build column map - HashMap<String,Long> map = _rcdMaps.get(colID); - String key = in[colID-1]; - if( !map.containsKey(key) ) - map.put(key, new Long(map.size()+1)); - } - } - - @Override - public void build(FrameBlock in) { - if( !isApplicable() ) - return; - - Iterator<String[]> iter = in.getStringRowIterator(); - while( iter.hasNext() ) - build( iter.next() ); - } @Override - public FrameBlock getMetaData(FrameBlock out) { + public FrameBlock getMetaData(FrameBlock meta) { if( !isApplicable() ) - return out; + return meta; //inverse operation to initRecodeMaps @@ -505,7 +472,7 @@ public class RecodeAgent extends Encoder for( int j=0; j<_colList.length; j++ ) if( _rcdMaps.containsKey(_colList[j]) ) maxDistinct = Math.max(maxDistinct, _rcdMaps.get(_colList[j]).size()); - out.ensureAllocatedColumns(maxDistinct); + meta.ensureAllocatedColumns(maxDistinct); //create compact meta data representation for( int j=0; j<_colList.length; j++ ) { @@ -513,12 +480,41 @@ public class RecodeAgent extends Encoder int rowID = 0; if( _rcdMaps.containsKey(_colList[j]) ) for( Entry<String, Long> e : _rcdMaps.get(colID).entrySet() ) { - String tmp = e.getKey() + Lop.DATATYPE_PREFIX + e.getValue().toString(); - out.set(rowID++, colID-1, tmp); + String tmp = constructRecodeMapEntry(e.getKey(), e.getValue()); + meta.set(rowID++, colID-1, tmp); } + meta.getColumnMetadata(colID-1).setNumDistinct( + _rcdMaps.get(colID).size()); } - return out; + return meta; + } + + + /** + * Construct the recodemaps from the given input frame for all + * columns registered for recode. + * + * @param frame + */ + public void initMetaData( FrameBlock meta ) { + if( meta == null || meta.getNumRows()<=0 ) + return; + + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; //1-based + _rcdMaps.put(colID, meta.getRecodeMap(colID-1)); + } + } + + /** + * + * @param token + * @param code + * @return + */ + public static String constructRecodeMapEntry(String token, Long code) { + return token + Lop.DATATYPE_PREFIX + code.toString(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java index f66ed5e..6f7c7ee 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java @@ -19,6 +19,7 @@ package org.apache.sysml.runtime.transform.decode; +import java.io.Serializable; import java.util.List; import org.apache.sysml.parser.Expression.ValueType; @@ -30,25 +31,19 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; * interface for decoding matrices to frames. * */ -public abstract class Decoder +public abstract class Decoder implements Serializable { - protected List<ValueType> _schema = null; + private static final long serialVersionUID = -1732411001366177787L; - protected Decoder( List<ValueType> schema ) { + protected List<ValueType> _schema = null; + protected int[] _colList = null; + + protected Decoder( List<ValueType> schema, int[] colList ) { _schema = schema; + _colList = colList; } /** - * Row decode API converting a matrix row into a frame row - * of the specified decoder schema. - * - * @param in - * @param out - * @return - */ - public abstract Object[] decode(double[] in, Object[] out); - - /** * Block decode API converting a matrix block into a frame block. * * @param in @@ -57,4 +52,10 @@ public abstract class Decoder * @return returns given output frame block for convenience */ public abstract FrameBlock decode(MatrixBlock in, FrameBlock out); + + /** + * + * @param meta + */ + public abstract void initMetaData(FrameBlock meta); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java index b0b0909..a8f9233 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java @@ -34,29 +34,30 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; */ public class DecoderComposite extends Decoder { + private static final long serialVersionUID = 5790600547144743716L; + private List<Decoder> _decoders = null; protected DecoderComposite(List<ValueType> schema, List<Decoder> decoders) { - super(schema); + super(schema, null); _decoders = decoders; } protected DecoderComposite(List<ValueType> schema, Decoder[] decoders) { - super(schema); + super(schema, null); _decoders = Arrays.asList(decoders); } @Override - public Object[] decode(double[] in, Object[] out) { + public FrameBlock decode(MatrixBlock in, FrameBlock out) { for( Decoder decoder : _decoders ) - decoder.decode(in, out); + out = decoder.decode(in, out); return out; } - + @Override - public FrameBlock decode(MatrixBlock in, FrameBlock out) { + public void initMetaData(FrameBlock meta) { for( Decoder decoder : _decoders ) - decoder.decode(in, out); - return out; + decoder.initMetaData(meta); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java new file mode 100644 index 0000000..2916742 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.transform.decode; + +import java.util.List; + +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.UtilFunctions; + +/** + * Simple atomic decoder for dummycoded columns. This decoder builds internally + * inverted column mappings from the given frame meta data. + * + */ +public class DecoderDummycode extends Decoder +{ + private static final long serialVersionUID = 4758831042891032129L; + + private int[] _clPos = null; + private int[] _cuPos = null; + + protected DecoderDummycode(List<ValueType> schema, int[] dcCols) { + //dcCols refers to column IDs in output (non-dc) + super(schema, dcCols); + } + + @Override + public FrameBlock decode(MatrixBlock in, FrameBlock out) { + out.ensureAllocatedColumns(in.getNumRows()); + for( int i=0; i<in.getNumRows(); i++ ) + for( int j=0; j<_colList.length; j++ ) + for( int k=_clPos[j]; k<_cuPos[j]; k++ ) + if( in.quickGetValue(i, k-1) != 0 ) { + int col = _colList[j] - 1; + out.set(i, col, UtilFunctions.doubleToObject( + out.getSchema().get(col), k-_clPos[j]+1)); + } + return out; + } + + @Override + public void initMetaData(FrameBlock meta) { + _clPos = new int[_colList.length]; //col lower pos + _cuPos = new int[_colList.length]; //col upper pos + for( int j=0, off=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + int ndist = (int)meta.getColumnMetadata() + .get(colID-1).getNumDistinct(); + _clPos[j] = off + colID; + _cuPos[j] = _clPos[j] + ndist; + off += ndist - 1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java index d0e04f4..78f1ad2 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java @@ -20,16 +20,18 @@ package org.apache.sysml.runtime.transform.decode; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.util.UtilFunctions; -import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONObject; @@ -43,6 +45,7 @@ public class DecoderFactory * @return * @throws DMLRuntimeException */ + @SuppressWarnings("unchecked") public static Decoder createDecoder(String spec, List<ValueType> schema, FrameBlock meta) throws DMLRuntimeException { @@ -59,44 +62,34 @@ public class DecoderFactory schema = Collections.nCopies(meta.getNumColumns(), ValueType.STRING); } - //create decoders 'recode' and 'pass-through' - if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE)) { - JSONArray attrs = null; - if( jSpec.get(TfUtils.TXMETHOD_RECODE) instanceof JSONObject ) { - JSONObject obj = (JSONObject) jSpec.get(TfUtils.TXMETHOD_RECODE); - attrs = (JSONArray) obj.get(TfUtils.JSON_ATTRS); - } - else - attrs = (JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE); - - //recode decoder - int[] rcCols = new int[attrs.size()]; - for(int j=0; j<rcCols.length; j++) - rcCols[j] = UtilFunctions.toInt(attrs.get(j))-1; - ldecoders.add(new DecoderRecode(schema, meta, rcCols)); - - //pass-through decode (non-recode columns) - if( schema.size() > attrs.size() ) { - int[] ptCols = new int[schema.size()-attrs.size()]; - HashSet<Integer> probe = new HashSet<Integer>(); - for( int j=0; j<rcCols.length; j++ ) - probe.add(rcCols[j]); - for( int j=0, pos=0; j<schema.size(); j++ ) - if( !probe.contains(j) ) - ptCols[pos++] = j; - ldecoders.add(new DecoderPassThrough(schema, ptCols)); - } + //create decoders 'recode', 'dummy' and 'pass-through' + List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_RECODE))); + List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject( + TfMetaUtils.parseJsonIDList(jSpec, TfUtils.TXMETHOD_DUMMYCODE))); + rcIDs = new ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs)); + List<Integer> ptIDs = new ArrayList<Integer>(CollectionUtils + .subtract(UtilFunctions.getSequenceList(1, schema.size(), 1), rcIDs)); + + if( !dcIDs.isEmpty() ) { + ldecoders.add(new DecoderDummycode(schema, + ArrayUtils.toPrimitive(dcIDs.toArray(new Integer[0])))); + } + if( !rcIDs.isEmpty() ) { + ldecoders.add(new DecoderRecode(schema, !dcIDs.isEmpty(), + ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0])))); } - //create full 'pass-through' decoder if necessary - else { - int[] ptCols = new int[schema.size()]; - for( int j=0; j<ptCols.length; j++ ) - ptCols[j] = j; - ldecoders.add(new DecoderPassThrough(schema, ptCols)); + if( !ptIDs.isEmpty() ) { + ldecoders.add(new DecoderPassThrough(schema, + ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), + ArrayUtils.toPrimitive(dcIDs.toArray(new Integer[0])))); } //create composite decoder of all created decoders + //and initialize with given meta data (recode, dummy, bin) decoder = new DecoderComposite(schema, ldecoders); + if( meta != null ) + decoder.initMetaData(meta); } catch(Exception ex) { throw new DMLRuntimeException(ex); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java index 0333504..d2bf7fa 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java @@ -33,30 +33,52 @@ import org.apache.sysml.runtime.util.UtilFunctions; */ public class DecoderPassThrough extends Decoder { - private int[] _ptCols = null; //0-based + private static final long serialVersionUID = -8525203889417422598L; - protected DecoderPassThrough(List<ValueType> schema, int[] ptCols) { - super(schema); - _ptCols = ptCols; - } - - @Override - public Object[] decode(double[] in, Object[] out) { - for( int j=0; j<_ptCols.length; j++ ) - out[_ptCols[j]] = in[_ptCols[j]]; - return out; + private int[] _dcCols = null; + private int[] _srcCols = null; + + protected DecoderPassThrough(List<ValueType> schema, int[] ptCols, int[] dcCols) { + super(schema, ptCols); + _dcCols = dcCols; } @Override public FrameBlock decode(MatrixBlock in, FrameBlock out) { out.ensureAllocatedColumns(in.getNumRows()); for( int i=0; i<in.getNumRows(); i++ ) { - for( int j=0; j<_ptCols.length; j++ ) { - double val = in.quickGetValue(i, _ptCols[j]); - out.set(i, _ptCols[j], UtilFunctions.doubleToObject( - _schema.get(_ptCols[j]), val)); + for( int j=0; j<_colList.length; j++ ) { + int srcColID = _srcCols[j]; + int tgtColID = _colList[j]; + double val = in.quickGetValue(i, srcColID-1); + out.set(i, tgtColID-1, UtilFunctions.doubleToObject( + _schema.get(tgtColID-1), val)); } } return out; } + + @Override + public void initMetaData(FrameBlock meta) { + if( _dcCols.length > 0 ) { + //prepare source column id mapping w/ dummy coding + _srcCols = new int[_colList.length]; + int ix1 = 0, ix2 = 0, off = 0; + while( ix1<_colList.length ) { + if( ix2>=_dcCols.length || _colList[ix1] < _dcCols[ix2] ) { + _srcCols[ix1] = _colList[ix1] + off; + ix1 ++; + } + else { //_colList[ix1] > _dcCols[ix2] + off += (int)meta.getColumnMetadata() + .get(_dcCols[ix2]-1).getNumDistinct() - 1; + ix2 ++; + } + } + } + else { + //prepare direct source column mapping + _srcCols = _colList; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java index f554f8b..5484ded 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java +++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java @@ -37,54 +37,63 @@ import org.apache.sysml.runtime.util.UtilFunctions; */ public class DecoderRecode extends Decoder { - private int[] _rcCols = null; //0-based + private static final long serialVersionUID = -3784249774608228805L; + private HashMap<Long, Object>[] _rcMaps = null; + private boolean _onOut = false; - @SuppressWarnings("unchecked") - protected DecoderRecode(List<ValueType> schema, FrameBlock meta, int[] rcCols) { - super(schema); - - //initialize recode maps according to schema - _rcCols = rcCols; - _rcMaps = new HashMap[_rcCols.length]; - for( int j=0; j<_rcCols.length; j++ ) { - HashMap<Long, Object> map = new HashMap<Long, Object>(); - for( int i=0; i<meta.getNumRows(); i++ ) { - if( meta.get(i, _rcCols[j])==null ) - break; //reached end of recode map - String[] tmp = meta.get(i, _rcCols[j]).toString().split(Lop.DATATYPE_PREFIX); - Object obj = UtilFunctions.stringToObject(schema.get(_rcCols[j]), tmp[0]); - map.put(Long.parseLong(tmp[1]), obj); - } - _rcMaps[j] = map; - } + protected DecoderRecode(List<ValueType> schema, boolean onOut, int[] rcCols) { + super(schema, rcCols); + _onOut = onOut; } @Override - public Object[] decode(double[] in, Object[] out) { - for( int j=0; j<_rcCols.length; j++ ) { - long key = UtilFunctions.toLong(in[_rcCols[j]]); - out[_rcCols[j]] = _rcMaps[j].get(key); + public FrameBlock decode(MatrixBlock in, FrameBlock out) { + if( _onOut ) { //recode on output (after dummy) + for( int i=0; i<in.getNumRows(); i++ ) { + for( int j=0; j<_colList.length; j++ ) { + int colID = _colList[j]; + double val = UtilFunctions.objectToDouble( + out.getSchema().get(colID-1), out.get(i, colID-1)); + long key = UtilFunctions.toLong(val); + out.set(i, colID-1, _rcMaps[j].get(key)); + } + } + } + else { //recode on input (no dummy) + out.ensureAllocatedColumns(in.getNumRows()); + for( int i=0; i<in.getNumRows(); i++ ) { + for( int j=0; j<_colList.length; j++ ) { + double val = in.quickGetValue(i, _colList[j]-1); + long key = UtilFunctions.toLong(val); + out.set(i, _colList[j]-1, _rcMaps[j].get(key)); + } + } } return out; } @Override - public FrameBlock decode(MatrixBlock in, FrameBlock out) { - out.ensureAllocatedColumns(in.getNumRows()); - for( int i=0; i<in.getNumRows(); i++ ) { - for( int j=0; j<_rcCols.length; j++ ) { - double val = in.quickGetValue(i, _rcCols[j]); - long key = UtilFunctions.toLong(val); - out.set(i, _rcCols[j], _rcMaps[j].get(key)); + @SuppressWarnings("unchecked") + public void initMetaData(FrameBlock meta) { + //initialize recode maps according to schema + _rcMaps = new HashMap[_colList.length]; + for( int j=0; j<_colList.length; j++ ) { + HashMap<Long, Object> map = new HashMap<Long, Object>(); + for( int i=0; i<meta.getNumRows(); i++ ) { + if( meta.get(i, _colList[j]-1)==null ) + break; //reached end of recode map + String[] tmp = meta.get(i, _colList[j]-1).toString().split(Lop.DATATYPE_PREFIX); + Object obj = UtilFunctions.stringToObject(_schema.get(_colList[j]-1), tmp[0]); + map.put(Long.parseLong(tmp[1]), obj); } + _rcMaps[j] = map; } - return out; } /** * Parses a line of <token, ID, count> into <token, ID> pairs, where - * quoted tokens (potentially including separators) are supportd. + * quoted tokens (potentially including separators) are supported. * * @param entry * @param pair http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java index ac01357..b5f81d0 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java @@ -110,15 +110,6 @@ public abstract class Encoder implements Serializable } /** - * Row encode: build and apply (transform encode). - * - * @param in - * @param out - * @return - */ - public abstract double[] encode(String[] in, double[] out); - - /** * Block encode: build and apply (transform encode). * * @param in @@ -128,14 +119,6 @@ public abstract class Encoder implements Serializable public abstract MatrixBlock encode(FrameBlock in, MatrixBlock out); /** - * Build the transform meta data for given row input. This call modifies - * and keeps meta data as encoder state. - * - * @param in - */ - public abstract void build(String[] in); - - /** * Build the transform meta data for the given block input. This call modifies * and keeps meta data as encoder state. * @@ -144,30 +127,38 @@ public abstract class Encoder implements Serializable public abstract void build(FrameBlock in); /** - * Construct a frame block out of the transform meta data. + * Encode input data blockwise according to existing transform meta + * data (transform apply). * + * @param in + * @param out * @return */ - public abstract FrameBlock getMetaData(FrameBlock out); + public abstract MatrixBlock apply(FrameBlock in, MatrixBlock out); /** * Encode input data according to existing transform meta * data (transform apply). + * TODO remove once file-based transform removed * * @param in * @return */ public abstract String[] apply(String[] in); - + /** - * Encode input data blockwise according to existing transform meta - * data (transform apply). + * Construct a frame block out of the transform meta data. * - * @param in - * @param out * @return */ - public abstract MatrixBlock apply(FrameBlock in, MatrixBlock out); + public abstract FrameBlock getMetaData(FrameBlock out); + + /** + * Sets up the required meta data for a subsequent call to apply. + * + * @param meta + */ + public abstract void initMetaData(FrameBlock meta); //OLD API: kept for a transition phase only http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java index 1c27350..bafa655 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.transform.DistinctValue; @@ -45,6 +46,7 @@ public class EncoderComposite extends Encoder private static final long serialVersionUID = -8473768154646831882L; private List<Encoder> _encoders = null; + private FrameBlock _meta = null; protected EncoderComposite(List<Encoder> encoders) { super(null, -1); @@ -63,25 +65,29 @@ public class EncoderComposite extends Encoder clen = Math.max(clen, encoder.getNumCols()); return clen; } - - @Override - public double[] encode(String[] in, double[] out) { - for( Encoder encoder : _encoders ) - out = encoder.encode(in, out); - return out; - } + public List<Encoder> getEncoders() { + return _encoders; + } + @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { + //build meta data first (for all encoders) for( Encoder encoder : _encoders ) - out = encoder.encode(in, out); - return out; - } - - @Override - public void build(String[] in) { + encoder.build(in); + + //propagate meta data + _meta = new FrameBlock(in.getNumColumns(), ValueType.STRING); + for( Encoder encoder : _encoders ) { + encoder.initMetaData(_meta); + _meta = encoder.getMetaData(_meta); + } + + //apply meta data for( Encoder encoder : _encoders ) - encoder.build(in); + out = encoder.apply(in, out); + + return out; } @Override @@ -90,13 +96,7 @@ public class EncoderComposite extends Encoder encoder.build(in); } - @Override - public FrameBlock getMetaData(FrameBlock out) { - for( Encoder encoder : _encoders ) - encoder.getMetaData(out); - return out; - } - + @Override public String[] apply(String[] in) { for( Encoder encoder : _encoders ) @@ -110,6 +110,21 @@ public class EncoderComposite extends Encoder out = encoder.apply(in, out); return out; } + + @Override + public FrameBlock getMetaData(FrameBlock out) { + if( _meta != null ) + return _meta; + for( Encoder encoder : _encoders ) + encoder.getMetaData(out); + return out; + } + + @Override + public void initMetaData(FrameBlock out) { + for( Encoder encoder : _encoders ) + encoder.initMetaData(out); + } @Override public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java index c3959ca..ae98bae 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java @@ -105,38 +105,28 @@ public class EncoderFactory if( !rcIDs.isEmpty() ) { RecodeAgent ra = new RecodeAgent(jSpec, clen); ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0]))); - if( meta != null ) - ra.initRecodeMaps(meta); lencoders.add(ra); } - if( !ptIDs.isEmpty() ) { + if( !ptIDs.isEmpty() ) lencoders.add(new EncoderPassThrough( ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen)); - } - if( !dcIDs.isEmpty() ) { - DummycodeAgent da = new DummycodeAgent(jSpec, schema.size()); - if( meta != null ) - da.initDomainSizes(meta); - lencoders.add(da); - } - if( !binIDs.isEmpty() ) { - BinAgent ba = new BinAgent(jSpec, schema.size(), true); - if( meta != null ) - ba.initBins(meta); - lencoders.add(ba); - } - if( !oIDs.isEmpty() ) { + if( !dcIDs.isEmpty() ) + lencoders.add(new DummycodeAgent(jSpec, schema.size())); + if( !binIDs.isEmpty() ) + lencoders.add(new BinAgent(jSpec, schema.size(), true)); + if( !oIDs.isEmpty() ) lencoders.add(new OmitAgent(jSpec, schema.size())); - } if( !mvIDs.isEmpty() ) { MVImputeAgent ma = new MVImputeAgent(jSpec, schema.size()); - if( meta != null ) - ma.initReplacementList(meta, rcIDs); + ma.initRecodeIDList(rcIDs); lencoders.add(ma); } - //create composite decoder of all created decoders + //create composite decoder of all created encoders + //and initialize meta data (recode, dummy, bin, mv) encoder = new EncoderComposite(lencoders); + if( meta != null ) + encoder.initMetaData(meta); } catch(Exception ex) { throw new DMLRuntimeException(ex); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java index 445d019..ab146ce 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java @@ -49,37 +49,16 @@ public class EncoderPassThrough extends Encoder } @Override - public double[] encode(String[] in, double[] out) { - for( int j=0; j<_colList.length; j++ ) { - String tmp = in[_colList[j]-1]; - out[_colList[j]-1] = (tmp==null) ? 0 : - Double.parseDouble(tmp); - } - - return out; - } - - @Override public MatrixBlock encode(FrameBlock in, MatrixBlock out) { return apply(in, out); } @Override - public void build(String[] in) { - //do nothing - } - - @Override public void build(FrameBlock in) { //do nothing } @Override - public FrameBlock getMetaData(FrameBlock out) { - return null; - } - - @Override public String[] apply(String[] in) { return in; } @@ -101,6 +80,18 @@ public class EncoderPassThrough extends Encoder } @Override + public FrameBlock getMetaData(FrameBlock meta) { + //do nothing + return meta; + } + + @Override + public void initMetaData(FrameBlock meta) { + //do nothing + } + + + @Override public void mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> out, int taskID, TfUtils agents) throws IOException { throw new RuntimeException("File-based api not supported."); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java index b9feb5d..82396f6 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java @@ -132,7 +132,10 @@ public class TfMetaUtils for(int j=0; j<colspecs.size(); j++) { JSONObject colspec = (JSONObject) colspecs.get(j); colList[j] = colspec.getInt("id"); - } + } + + //ensure ascending order of column IDs + Arrays.sort(colList); } return colList;
