[SYSTEMML-1300] Remove file-based transform from compiler/runtime This patch removes the old (deprecated) file-based transform in favor of the new frame-based transform. In detail, this includes the removal of existing cp/sp/mr file-based implementations, the entire transform language/compiler/runtime integration, and transform-specific tests as well as a refactoring of existing encoders and various cleanups.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0cd3905f Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0cd3905f Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0cd3905f Branch: refs/heads/master Commit: 0cd3905f592b5ee0c867ebc952d9ba367fb8a0c9 Parents: 8a27573 Author: Matthias Boehm <[email protected]> Authored: Wed Jun 7 22:12:01 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Jun 8 12:24:12 2017 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/Hop.java | 11 +- .../org/apache/sysml/hops/OptimizerUtils.java | 10 - .../sysml/hops/ParameterizedBuiltinOp.java | 20 - .../apache/sysml/hops/recompile/Recompiler.java | 21 - .../sysml/hops/rewrite/HopRewriteUtils.java | 17 - .../rewrite/RewriteBlockSizeAndReblock.java | 26 +- .../RewriteInjectSparkPReadCheckpointing.java | 5 +- .../rewrite/RewriteSplitDagUnknownCSVRead.java | 3 +- .../java/org/apache/sysml/lops/CSVReBlock.java | 41 +- src/main/java/org/apache/sysml/lops/Data.java | 25 +- .../apache/sysml/lops/ParameterizedBuiltin.java | 64 +- src/main/java/org/apache/sysml/lops/Unary.java | 10 +- .../java/org/apache/sysml/lops/compile/Dag.java | 58 +- .../org/apache/sysml/lops/compile/JobType.java | 18 +- .../apache/sysml/lops/runtime/RunMRJobs.java | 34 +- .../org/apache/sysml/parser/DMLTranslator.java | 7 - .../org/apache/sysml/parser/Expression.java | 2 +- .../ParameterizedBuiltinFunctionExpression.java | 68 +- .../functionobjects/ParameterizedBuiltin.java | 6 +- .../instructions/CPInstructionParser.java | 1 - .../instructions/MRInstructionParser.java | 1 - .../instructions/SPInstructionParser.java | 1 - .../cp/ParameterizedBuiltinCPInstruction.java | 15 +- .../runtime/instructions/mr/MRInstruction.java | 2 +- ...ReturnParameterizedBuiltinSPInstruction.java | 34 +- .../ParameterizedBuiltinSPInstruction.java | 14 - .../instructions/spark/WriteSPInstruction.java | 70 +- .../sysml/runtime/matrix/CSVReblockMR.java | 21 +- .../matrix/mapred/CSVAssignRowIDMapper.java | 19 +- .../matrix/mapred/MRJobConfiguration.java | 27 - .../sysml/runtime/transform/ApplyTfBBMR.java | 155 -- .../runtime/transform/ApplyTfBBMapper.java | 157 -- .../sysml/runtime/transform/ApplyTfCSVMR.java | 129 -- .../runtime/transform/ApplyTfCSVMapper.java | 113 -- .../runtime/transform/ApplyTfCSVSPARK.java | 164 -- .../sysml/runtime/transform/BinAgent.java | 382 ----- .../sysml/runtime/transform/DataTransform.java | 1496 ------------------ .../sysml/runtime/transform/DistinctValue.java | 105 -- .../sysml/runtime/transform/DummycodeAgent.java | 461 ------ .../sysml/runtime/transform/GTFMTDMapper.java | 111 -- .../sysml/runtime/transform/GTFMTDReducer.java | 127 -- .../sysml/runtime/transform/GenTfMtdMR.java | 105 -- .../sysml/runtime/transform/GenTfMtdSPARK.java | 240 --- .../sysml/runtime/transform/MVImputeAgent.java | 1046 ------------ .../sysml/runtime/transform/OmitAgent.java | 148 -- .../sysml/runtime/transform/RecodeAgent.java | 534 ------- .../apache/sysml/runtime/transform/TfUtils.java | 446 +----- .../sysml/runtime/transform/encode/Encoder.java | 17 - .../runtime/transform/encode/EncoderBin.java | 188 +++ .../transform/encode/EncoderComposite.java | 25 - .../transform/encode/EncoderDummycode.java | 139 ++ .../transform/encode/EncoderFactory.java | 16 +- .../transform/encode/EncoderMVImpute.java | 422 +++++ .../runtime/transform/encode/EncoderOmit.java | 123 ++ .../transform/encode/EncoderPassThrough.java | 25 - .../runtime/transform/encode/EncoderRecode.java | 253 +++ .../runtime/transform/meta/TfMetaUtils.java | 1 - .../functions/frame/FrameFunctionTest.java | 3 - .../functions/frame/FrameMatrixReblockTest.java | 6 - .../functions/frame/FrameMetaReadWriteTest.java | 6 - .../transform/FrameCSVReadWriteTest.java | 4 - .../functions/transform/RunTest.java | 268 ---- .../functions/transform/ScalingTest.java | 244 --- .../transform/TransformAndApplyTest.java | 143 -- .../TransformCSVFrameEncodeDecodeTest.java | 6 +- .../TransformCSVFrameEncodeReadTest.java | 6 +- .../transform/TransformFrameApplyTest.java | 194 --- .../TransformFrameEncodeApplyTest.java | 4 - .../TransformFrameEncodeDecodeTest.java | 4 - .../TransformFrameEncodeDecodeTokenTest.java | 4 - .../transform/TransformReadMetaTest.java | 205 --- .../functions/transform/TransformTest.java | 709 --------- src/test/scripts/functions/transform/Apply.dml | 30 - .../scripts/functions/transform/ApplyFrame.dml | 29 - src/test/scripts/functions/transform/Scaling.R | 36 - .../scripts/functions/transform/Scaling.dml | 31 - .../scripts/functions/transform/Transform.dml | 31 - .../functions/transform/TransformAndApply.dml | 37 - .../transform/TransformAndApplySpecX.json | 5 - .../transform/TransformAndApplySpecY.json | 5 - .../functions/transform/TransformReadMeta.dml | 33 - .../functions/transform/TransformReadMeta2.dml | 33 - .../transform/TransformReadMetaSpecX.json | 5 - .../functions/transform/Transform_colnames.dml | 32 - .../functions/transform/ZPackageSuite.java | 6 - 85 files changed, 1209 insertions(+), 8689 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index 6c7089b..31e3aa6 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -288,11 +288,9 @@ public abstract class Hop try { - if( (this instanceof DataOp // CSV - && ((DataOp)this).getDataOpType() == DataOpTypes.PERSISTENTREAD - && ((DataOp)this).getInputFormatType() == FileFormatTypes.CSV ) - || (this instanceof ParameterizedBuiltinOp - && ((ParameterizedBuiltinOp)this).getOp() == ParamBuiltinOp.TRANSFORM) ) + if( this instanceof DataOp // CSV + && ((DataOp)this).getDataOpType() == DataOpTypes.PERSISTENTREAD + && ((DataOp)this).getInputFormatType() == FileFormatTypes.CSV ) { reblock = new CSVReBlock( input, getRowsInBlock(), getColsInBlock(), getDataType(), getValueType(), et); @@ -1038,7 +1036,7 @@ public abstract class Hop public enum ParamBuiltinOp { INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA, + TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA, TOSTRING }; @@ -1298,7 +1296,6 @@ public abstract class Hop HopsParameterizedBuiltinLops.put(ParamBuiltinOp.RMEMPTY, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.RMEMPTY); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REPLACE, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REPLACE); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REXPAND, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REXPAND); - HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORM, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMAPPLY, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMAPPLY); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMDECODE, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMDECODE); HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMMETA, org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMMETA); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 0c1c838..a40e36c 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -137,11 +137,6 @@ public class OptimizerUtils public static boolean ALLOW_WORSTCASE_SIZE_EXPRESSION_EVALUATION = true; public static boolean ALLOW_RAND_JOB_RECOMPILE = true; - - /** - * Enables CP-side data transformation for small files. - */ - public static boolean ALLOW_TRANSFORM_RECOMPILE = true; /** * Enables parfor runtime piggybacking of MR jobs into the packed jobs for @@ -205,11 +200,6 @@ public class OptimizerUtils */ public static final boolean ALLOW_COMBINE_FILE_INPUT_FORMAT = true; - /** - * Enables automatic csv-binary block reblock. - */ - public static boolean ALLOW_FRAME_CSV_REBLOCK = true; - public static long GPU_MEMORY_BUDGET = -1; http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java index 74542f4..7bff4bd 100644 --- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java +++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java @@ -35,7 +35,6 @@ import org.apache.sysml.lops.GroupedAggregateM; import org.apache.sysml.lops.Lop; import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.lops.LopsException; -import org.apache.sysml.lops.OutputParameters.Format; import org.apache.sysml.lops.PMMJ; import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.lops.ParameterizedBuiltin; @@ -201,20 +200,6 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop constructLopsRExpand(inputlops, et); break; } - case TRANSFORM: { - ExecType et = optFindExecType(); - - ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops, - HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et); - setOutputDimensions(pbilop); - setLineNumbers(pbilop); - // output of transform is always in CSV format - // to produce a blocked output, this lop must be - // fed into CSV Reblock lop. - pbilop.getOutputParameters().setFormat(Format.CSV); - setLops(pbilop); - break; - } case CDF: case INVCDF: case REPLACE: @@ -1084,11 +1069,6 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop } else { - if( _op == ParamBuiltinOp.TRANSFORM ) { - // force remote, at runtime cp transform triggered for small files. - return (_etype = REMOTE); - } - if ( OptimizerUtils.isMemoryBasedOptLevel() ) { _etype = findExecTypeByMemEstimate(); } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index 9942035..c92b735 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -125,7 +125,6 @@ public class Recompiler //note that we scale this threshold up by the degree of available parallelism private static final long CP_REBLOCK_THRESHOLD_SIZE = (long)1024*1024*1024; private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = (long)256*1024*1024; - private static final long CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE = (long)1024*1024*1024; /** Local reused rewriter for dynamic rewrites during recompile */ @@ -1838,26 +1837,6 @@ public class Recompiler return (estFilesize < cpThreshold); } - public static boolean checkCPTransform(MRJobInstruction inst, MatrixObject[] inputs) - throws DMLRuntimeException, IOException - { - boolean ret = true; - - MatrixObject input = inputs[0]; // there can only be one input in TRANSFORM job - - Path path = new Path(input.getFileName()); - long sizeOnHDFS = MapReduceTool.getFilesizeOnHDFS(path); - - // dimensions are not checked here, since the worst case dimensions - // after transformations (with potential dummycoding) are typically unknown. - - if( sizeOnHDFS > CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE - || sizeOnHDFS*4 > OptimizerUtils.getLocalMemBudget() ) - ret = false; - LOG.info("checkCPTransform(): size = " + sizeOnHDFS + ", recompile to CP = " + ret); - return ret; - } - public static boolean checkCPDataGen( MRJobInstruction inst, String updatedRandInst ) throws DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java index b406bb7..bec7b38 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java @@ -982,23 +982,6 @@ public class HopRewriteUtils return ret; } - public static boolean hasTransformParents( Hop hop ) - { - boolean ret = false; - - ArrayList<Hop> parents = hop.getParent(); - for( Hop p : parents ) - { - if( p instanceof ParameterizedBuiltinOp - && ((ParameterizedBuiltinOp)p).getOp()==ParamBuiltinOp.TRANSFORM) { - ret = true; - } - } - - - return ret; - } - public static boolean alwaysRequiresReblock(Hop hop) { return ( hop instanceof DataOp http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java index 2e2f91f..245a333 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java @@ -28,11 +28,8 @@ import org.apache.sysml.hops.DataOp; import org.apache.sysml.hops.FunctionOp; import org.apache.sysml.hops.Hop; import org.apache.sysml.hops.OptimizerUtils; -import org.apache.sysml.hops.Hop.DataOpTypes; import org.apache.sysml.hops.Hop.FileFormatTypes; -import org.apache.sysml.hops.Hop.ParamBuiltinOp; import org.apache.sysml.hops.HopsException; -import org.apache.sysml.hops.ParameterizedBuiltinOp; import org.apache.sysml.parser.Expression.DataType; /** @@ -98,7 +95,7 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule if( canReblock && ( (dop.getDataType() == DataType.MATRIX && (dop.getRowsInBlock() != blocksize || dop.getColsInBlock() != blocksize)) ||(dop.getDataType() == DataType.FRAME && OptimizerUtils.isSparkExecutionMode() && (dop.getInputFormatType()==FileFormatTypes.TEXT - || dop.getInputFormatType()==FileFormatTypes.CSV && OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK))) ) + || dop.getInputFormatType()==FileFormatTypes.CSV))) ) { if( dop.getDataOpType() == DataOp.DataOpTypes.PERSISTENTREAD) { @@ -146,27 +143,6 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule } } } - //TODO remove once transform rebased to frames - else if ( (hop instanceof ParameterizedBuiltinOp && ((ParameterizedBuiltinOp)hop).getOp() == ParamBuiltinOp.TRANSFORM) ) { - - // check if there exists a non-csv-write output. If yes, add reblock - boolean rblk = false; - for(Hop out : hop.getParent()) - { - if ( !(out instanceof DataOp - && ((DataOp)out).getDataOpType() == DataOpTypes.PERSISTENTWRITE - && ((DataOp)out).getInputFormatType() == FileFormatTypes.CSV) ) - { - rblk = true; - break; - } - } - if ( rblk ) - { - hop.setRequiresReblock(true); - hop.setOutputBlocksizes(blocksize, blocksize); - } - } else //NO DATAOP { // TODO: following two lines are commented, and the subsequent hack is used instead! http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java index 9aceab4..755c2e8 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java @@ -68,9 +68,8 @@ public class RewriteInjectSparkPReadCheckpointing extends HopRewriteRule return; // The reblocking is performed after transform, and hence checkpoint only non-transformed reads. - if( (hop instanceof DataOp && ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD && !HopRewriteUtils.hasTransformParents(hop)) - || (hop.requiresReblock()) - ) + if( (hop instanceof DataOp && ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD) + || hop.requiresReblock() ) { //make given hop for checkpointing (w/ default storage level) //note: we do not recursively process childs here in order to prevent unnecessary checkpoints http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java index ee7b9c6..8396813 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java @@ -146,8 +146,7 @@ public class RewriteSplitDagUnknownCSVRead extends StatementBlockRewriteRule if( dop.getDataOpType() == DataOpTypes.PERSISTENTREAD && dop.getInputFormatType() == FileFormatTypes.CSV && !dop.dimsKnown() - && !HopRewriteUtils.hasOnlyWriteParents(dop, true, false) - && !HopRewriteUtils.hasTransformParents(hop) ) + && !HopRewriteUtils.hasOnlyWriteParents(dop, true, false) ) { cand.add(dop); } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/CSVReBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/CSVReBlock.java b/src/main/java/org/apache/sysml/lops/CSVReBlock.java index a15e036..55571f7 100644 --- a/src/main/java/org/apache/sysml/lops/CSVReBlock.java +++ b/src/main/java/org/apache/sysml/lops/CSVReBlock.java @@ -21,12 +21,10 @@ package org.apache.sysml.lops; import org.apache.sysml.lops.LopProperties.ExecLocation; import org.apache.sysml.lops.LopProperties.ExecType; -import org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes; import org.apache.sysml.lops.compile.JobType; import org.apache.sysml.parser.DataExpression; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; -import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression; /** @@ -55,18 +53,7 @@ public class CSVReBlock extends Lop boolean breaksAlignment = false; boolean aligner = false; boolean definesMRJob = true; - - // If the input to reblock is a tranform, then piggyback it along with transform - if ( input instanceof ParameterizedBuiltin - && ((ParameterizedBuiltin)input).getOp() == OperationTypes.TRANSFORM ) - { - definesMRJob = false; - lps.addCompatibility(JobType.TRANSFORM); - } - else - { - lps.addCompatibility(JobType.CSV_REBLOCK); - } + lps.addCompatibility(JobType.CSV_REBLOCK); if(et == ExecType.MR) { this.lps.setProperties( inputs, ExecType.MR, ExecLocation.MapAndReduce, breaksAlignment, aligner, definesMRJob ); @@ -88,19 +75,16 @@ public class CSVReBlock extends Lop private String prepCSVProperties() throws LopsException { StringBuilder sb = new StringBuilder(); - boolean isSparkTransformInput = false; - Data dataInput = null; - if(getInputs().get(0).getType() == Type.Data) - dataInput = (Data)getInputs().get(0); - else if ( getInputs().get(0).getType() == Type.ParameterizedBuiltin && ((ParameterizedBuiltin)getInputs().get(0)).getOp() == OperationTypes.TRANSFORM) { - isSparkTransformInput = (getExecType() == ExecType.SPARK); - dataInput = (Data) ((ParameterizedBuiltin)getInputs().get(0)).getNamedInput(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_DATA); - } + Data dataInput = (Data)getInputs().get(0); - Lop headerLop = dataInput.getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW, String.valueOf(DataExpression.DEFAULT_DELIM_HAS_HEADER_ROW)); - Lop delimLop = dataInput.getNamedInputLop(DataExpression.DELIM_DELIMITER, DataExpression.DEFAULT_DELIM_DELIMITER); - Lop fillLop = dataInput.getNamedInputLop(DataExpression.DELIM_FILL, String.valueOf(DataExpression.DEFAULT_DELIM_FILL)); - Lop fillValueLop = dataInput.getNamedInputLop(DataExpression.DELIM_FILL_VALUE, String.valueOf(DataExpression.DEFAULT_DELIM_FILL_VALUE)); + Lop headerLop = dataInput.getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW, + String.valueOf(DataExpression.DEFAULT_DELIM_HAS_HEADER_ROW)); + Lop delimLop = dataInput.getNamedInputLop(DataExpression.DELIM_DELIMITER, + DataExpression.DEFAULT_DELIM_DELIMITER); + Lop fillLop = dataInput.getNamedInputLop(DataExpression.DELIM_FILL, + String.valueOf(DataExpression.DEFAULT_DELIM_FILL)); + Lop fillValueLop = dataInput.getNamedInputLop(DataExpression.DELIM_FILL_VALUE, + String.valueOf(DataExpression.DEFAULT_DELIM_FILL_VALUE)); if (headerLop.isVariable()) throw new LopsException(this.printErrorLocation() @@ -119,10 +103,7 @@ public class CSVReBlock extends Lop + "Parameter " + DataExpression.DELIM_FILL_VALUE + " must be a literal."); - // Output from transform() does not have a header - // On MR, reblock is piggybacked along with transform, and hence - // specific information about header needn't be passed through instruction - sb.append( ((Data)headerLop).getBooleanValue() && !isSparkTransformInput ); + sb.append( ((Data)headerLop).getBooleanValue() ); sb.append( OPERAND_DELIMITOR ); sb.append( ((Data)delimLop).getStringValue() ); sb.append( OPERAND_DELIMITOR ); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/Data.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Data.java b/src/main/java/org/apache/sysml/lops/Data.java index b8e40af..3d5db8a 100644 --- a/src/main/java/org/apache/sysml/lops/Data.java +++ b/src/main/java/org/apache/sysml/lops/Data.java @@ -139,18 +139,9 @@ public class Data extends Lop if ( getFileFormatType() == FileFormatTypes.CSV ) { - Lop input = getInputs().get(0); - // If the input is data transform, then csv write can be piggybacked onto TRANSFORM job. - // Otherwise, the input must be converted to csv format via WriteCSV MR job. - if ( input instanceof ParameterizedBuiltin - && ((ParameterizedBuiltin)input).getOp() == org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM ) { - lps.addCompatibility(JobType.TRANSFORM); - definesMRJob = false; - } - else { - lps.addCompatibility(JobType.CSV_WRITE); - definesMRJob = true; - } + // The input must be converted to csv format via WriteCSV MR job. + lps.addCompatibility(JobType.CSV_WRITE); + definesMRJob = true; } else { /* @@ -477,16 +468,8 @@ public class Data extends Lop if ( this.getExecType() == ExecType.SPARK ) { - boolean isInputMatrixBlock = true; - Lop input = getInputs().get(0); - if ( input instanceof ParameterizedBuiltin - && ((ParameterizedBuiltin)input).getOp() == ParameterizedBuiltin.OperationTypes.TRANSFORM ) { - // in the case of transform input, the input will be Text strings insteadof MatrixBlocks - // This information is used to have correct class information while accessing RDDs from the symbol table - isInputMatrixBlock = false; - } sb.append(OPERAND_DELIMITOR); - sb.append(isInputMatrixBlock); + sb.append(true); //isInputMatrixBlock } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java index fdaf3c5..fb48d2f 100644 --- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java +++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java @@ -28,7 +28,6 @@ import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.lops.compile.JobType; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; -import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression; /** @@ -40,7 +39,7 @@ public class ParameterizedBuiltin extends Lop public enum OperationTypes { CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA, + TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA, TOSTRING }; @@ -99,11 +98,6 @@ public class ParameterizedBuiltin extends Lop lps.addCompatibility(JobType.REBLOCK); breaksAlignment=true; } - else if ( _operation == OperationTypes.TRANSFORM && et == ExecType.MR ) { - definesMRJob = true; - eloc = ExecLocation.MapAndReduce; - lps.addCompatibility(JobType.TRANSFORM); - } else //executed in CP / CP_FILE / SPARK { eloc = ExecLocation.ControlProgram; @@ -212,8 +206,7 @@ public class ParameterizedBuiltin extends Lop } break; - - case TRANSFORM: + case TRANSFORMAPPLY: case TRANSFORMDECODE: case TRANSFORMMETA: { @@ -400,59 +393,6 @@ public class ParameterizedBuiltin extends Lop return sb.toString(); } - - @Override - public String getInstructions(int output_index) - throws LopsException - { - StringBuilder sb = new StringBuilder(); - sb.append( getExecType() ); - sb.append( Lop.OPERAND_DELIMITOR ); - - if(_operation== OperationTypes.TRANSFORM) - { - sb.append( "transform" ); - sb.append( OPERAND_DELIMITOR ); - - Lop iLop = _inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_DATA); - sb.append(iLop.prepInputOperand(getInputIndex("target"))); - sb.append( OPERAND_DELIMITOR ); - - Lop iLop2 = _inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_MTD); - sb.append(iLop2.prepScalarLabel()); - sb.append( OPERAND_DELIMITOR ); - - // apply transform - Lop iLop3a = _inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD); - if ( iLop3a != null ) { - sb.append("applymtd="); - sb.append(iLop3a.prepScalarLabel()); - sb.append( OPERAND_DELIMITOR ); - } - - // transform specification (transform: mandatory, transformapply: optional) - Lop iLop3b = _inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_SPEC); - if ( iLop3b != null ) { - sb.append("spec="); - sb.append(iLop3b.prepScalarLabel()); - sb.append( OPERAND_DELIMITOR ); - } - - Lop iLop4 = _inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_OUTNAMES); - if( iLop4 != null ) { - sb.append("outnames="); - sb.append(iLop4.prepScalarLabel()); - sb.append( OPERAND_DELIMITOR ); - } - - sb.append( prepOutputOperand(output_index)); - } - else - throw new LopsException(this.printErrorLocation() + "In ParameterizedBuiltin Lop, Unknown operation: " + _operation); - - return sb.toString(); - } - @Override public String toString() { http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/Unary.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Unary.java b/src/main/java/org/apache/sysml/lops/Unary.java index 087ffc4..cc53666 100644 --- a/src/main/java/org/apache/sysml/lops/Unary.java +++ b/src/main/java/org/apache/sysml/lops/Unary.java @@ -95,12 +95,11 @@ public class Unary extends Lop lps.addCompatibility(JobType.ANY); lps.removeNonPiggybackableJobs(); lps.removeCompatibility(JobType.CM_COV); // CM_COV allows only reducer instructions but this is MapOrReduce. TODO: piggybacking should be updated to take this extra constraint. - lps.removeCompatibility(JobType.TRANSFORM); - this.lps.setProperties(inputs, et, ExecLocation.MapOrReduce, breaksAlignment, aligner, definesMRJob); + lps.setProperties(inputs, et, ExecLocation.MapOrReduce, breaksAlignment, aligner, definesMRJob); } else { lps.addCompatibility(JobType.INVALID); - this.lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob); + lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob); } } @@ -150,12 +149,11 @@ public class Unary extends Lop lps.addCompatibility(JobType.ANY); lps.removeNonPiggybackableJobs(); lps.removeCompatibility(JobType.CM_COV); // CM_COV allows only reducer instructions but this is MapOrReduce. TODO: piggybacking should be updated to take this extra constraint. - lps.removeCompatibility(JobType.TRANSFORM); - this.lps.setProperties(inputs, et, ExecLocation.MapOrReduce, breaksAlignment, aligner, definesMRJob); + lps.setProperties(inputs, et, ExecLocation.MapOrReduce, breaksAlignment, aligner, definesMRJob); } else { lps.addCompatibility(JobType.INVALID); - this.lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob); + lps.setProperties(inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/compile/Dag.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java index 5816f3f..aab4303 100644 --- a/src/main/java/org/apache/sysml/lops/compile/Dag.java +++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java @@ -54,14 +54,12 @@ import org.apache.sysml.lops.MapMult; import org.apache.sysml.lops.OutputParameters; import org.apache.sysml.lops.OutputParameters.Format; import org.apache.sysml.lops.PMMJ; -import org.apache.sysml.lops.ParameterizedBuiltin; import org.apache.sysml.lops.PickByCount; import org.apache.sysml.lops.SortKeys; import org.apache.sysml.lops.Unary; import org.apache.sysml.parser.DataExpression; import org.apache.sysml.parser.Expression; import org.apache.sysml.parser.Expression.DataType; -import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression; import org.apache.sysml.parser.StatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; @@ -2374,50 +2372,6 @@ public class Dag<N extends Lop> out.addLastInstruction(currInstr); } - else if(node instanceof ParameterizedBuiltin - && ((ParameterizedBuiltin)node).getOp() == org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM) { - - ParameterizedBuiltin pbi = (ParameterizedBuiltin)node; - Lop input = pbi.getNamedInput(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_DATA); - if(input.getDataType()== DataType.FRAME) { - - // Output of transform is in CSV format, which gets subsequently reblocked - // TODO: change it to output binaryblock - - Data dataInput = (Data) input; - oparams.setFile_name(getNextUniqueFilename()); - oparams.setLabel(getNextUniqueVarname(DataType.MATRIX)); - - // generate an instruction that creates a symbol table entry for the new variable in CSV format - Data delimLop = (Data) dataInput.getNamedInputLop( - DataExpression.DELIM_DELIMITER, DataExpression.DEFAULT_DELIM_DELIMITER); - - Instruction createvarInst = VariableCPInstruction.prepareCreateVariableInstruction( - oparams.getLabel(), oparams.getFile_name(), true, - DataType.MATRIX, OutputInfo.outputInfoToString(OutputInfo.CSVOutputInfo), - new MatrixCharacteristics(oparams.getNumRows(), oparams.getNumCols(), -1, -1, oparams.getNnz()), oparams.getUpdateType(), - false, delimLop.getStringValue(), true - ); - - createvarInst.setLocation(node); - - out.addPreInstruction(createvarInst); - - // temp file as well as the variable has to be deleted at the end - Instruction currInstr = VariableCPInstruction.prepareRemoveInstruction(oparams.getLabel()); - - currInstr.setLocation(node); - - out.addLastInstruction(currInstr); - - // finally, add the generated filename and variable name to the list of outputs - out.setFileName(oparams.getFile_name()); - out.setVarName(oparams.getLabel()); - } - else { - throw new LopsException("Input to transform() has an invalid type: " + input.getDataType() + ", it must be FRAME."); - } - } else if(!(node instanceof FunctionCallCP)) //general case { // generate temporary filename and a variable name to hold the @@ -2913,7 +2867,7 @@ public class Dag<N extends Lop> */ // - if ( jt != JobType.REBLOCK && jt != JobType.CSV_REBLOCK && jt != JobType.DATAGEN && jt != JobType.TRANSFORM) { + if ( jt != JobType.REBLOCK && jt != JobType.CSV_REBLOCK && jt != JobType.DATAGEN ) { for (int i=0; i < inputInfos.size(); i++) if ( inputInfos.get(i) == InputInfo.BinaryCellInputInfo || inputInfos.get(i) == InputInfo.TextCellInputInfo ) cellModeOverride = true; @@ -3129,9 +3083,7 @@ public class Dag<N extends Lop> } if (node.getExecLocation() == ExecLocation.Data ) { - if ( ((Data)node).getFileFormatType() == FileFormatTypes.CSV - && !(node.getInputs().get(0) instanceof ParameterizedBuiltin - && ((ParameterizedBuiltin)node.getInputs().get(0)).getOp() == org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM)) { + if ( ((Data)node).getFileFormatType() == FileFormatTypes.CSV ) { // Generate write instruction, which goes into CSV_WRITE Job int output_index = start_index[0]; shuffleInstructions.add(node.getInstructions(inputIndices.get(0), output_index)); @@ -3171,12 +3123,6 @@ public class Dag<N extends Lop> break; case ParameterizedBuiltin: - if( ((ParameterizedBuiltin)node).getOp() == org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM ) { - shuffleInstructions.add(node.getInstructions(output_index)); - if(DMLScript.ENABLE_DEBUG_MODE) { - MRJobLineNumbers.add(node._beginLine); - } - } break; /* Lop types that take two inputs */ http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/compile/JobType.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/compile/JobType.java b/src/main/java/org/apache/sysml/lops/compile/JobType.java index 0ca5b49..492be7e 100644 --- a/src/main/java/org/apache/sysml/lops/compile/JobType.java +++ b/src/main/java/org/apache/sysml/lops/compile/JobType.java @@ -22,7 +22,6 @@ package org.apache.sysml.lops.compile; import org.apache.sysml.hops.Hop.FileFormatTypes; import org.apache.sysml.lops.Lop; import org.apache.sysml.lops.Data; -import org.apache.sysml.lops.ParameterizedBuiltin; import org.apache.sysml.runtime.DMLRuntimeException; @@ -70,8 +69,7 @@ public enum JobType DATA_PARTITION (11, "DATAPARTITION", false, false, true), CSV_REBLOCK (12, "CSV_REBLOCK", false, false, false), CSV_WRITE (13, "CSV_WRITE", false, false, true), - TRANSFORM (14, "TRANSFORM", false, true, false), - GMRCELL (15, "GMRCELL", false, false, false); + GMRCELL (14, "GMRCELL", false, false, false); @@ -140,8 +138,6 @@ public enum JobType return Lop.Type.MMRJ; else if ( getName().equals("SORT") ) return Lop.Type.SortKeys; - else if ( getName().equals("TRANSFORM")) - return Lop.Type.ParameterizedBuiltin; else throw new DMLRuntimeException("Shuffle Lop Type is not defined for a job (" + getName() + ") that allows a single shuffle instruction."); } @@ -178,19 +174,13 @@ public enum JobType case CSVReBlock: return JobType.CSV_REBLOCK; - case ParameterizedBuiltin: - if( ((ParameterizedBuiltin)node).getOp() == ParameterizedBuiltin.OperationTypes.TRANSFORM ) - return JobType.TRANSFORM; - case Data: /* * Only Write LOPs with external data formats (except MatrixMarket) produce MR Jobs */ FileFormatTypes fmt = ((Data) node).getFileFormatType(); - if ( fmt == FileFormatTypes.CSV ) - return JobType.CSV_WRITE; - else - return null; + return ( fmt == FileFormatTypes.CSV ) ? + JobType.CSV_WRITE : null; default: return null; @@ -205,7 +195,7 @@ public enum JobType else { if ( getName().equals("MMCJ") ) return false; - else if ( getName().equals("MMRJ") || getName().equals("SORT") || getName().equals("TRANSFORM")) + else if ( getName().equals("MMRJ") || getName().equals("SORT")) return true; else throw new DMLRuntimeException("Implementation for isCompatibleWithParentNodes() is missing for a job (" + getName() + ") that allows a single shuffle instruction."); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java b/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java index 71de7c8..5cd0f60 100644 --- a/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java +++ b/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java @@ -22,7 +22,6 @@ package org.apache.sysml.lops.runtime; import java.io.IOException; import org.apache.hadoop.fs.Path; -import org.apache.wink.json4j.JSONException; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; @@ -69,7 +68,6 @@ import org.apache.sysml.runtime.matrix.data.LibMatrixDatagen; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator; -import org.apache.sysml.runtime.transform.DataTransform; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.utils.Statistics; @@ -305,25 +303,7 @@ public class RunMRJobs case DATA_PARTITION: ret = DataPartitionMR.runJob(inst, inputMatrices, shuffleInst, inst.getIv_resultIndices(), outputMatrices, inst.getIv_numReducers(), inst.getIv_replication()); break; - - case TRANSFORM: - - if( ConfigurationManager.isDynamicRecompilation() - && OptimizerUtils.ALLOW_TRANSFORM_RECOMPILE - && DMLScript.rtplatform != RUNTIME_PLATFORM.HADOOP - && Recompiler.checkCPTransform( inst, inputMatrices ) ) - { - // transform the data and generate output in CSV format - ret = executeInMemoryTransform(inst, inputMatrices, outputMatrices); - Statistics.decrementNoOfExecutedMRJobs(); - execCP = true; - } - else - { - ret = DataTransform.mrDataTransform(inst, inputMatrices, shuffleInst, otherInst, inst.getIv_resultIndices(), outputMatrices, inst.getIv_numReducers(), inst.getIv_replication()); - } - break; - + default: throw new DMLRuntimeException("Invalid jobtype: " + inst.getJobType()); } @@ -360,11 +340,12 @@ public class RunMRJobs outputMatrices[i].setHDFSFileExists(true); - if ( inst.getJobType() != JobType.CSV_WRITE && inst.getJobType() != JobType.TRANSFORM) { + if ( inst.getJobType() != JobType.CSV_WRITE ) { // write out metadata file // Currently, valueType information in not stored in MR instruction, // since only DOUBLE matrices are supported ==> hard coded the value type information for now - MapReduceTool.writeMetaDataFile(fname + ".mtd", ValueType.DOUBLE, ((MatrixDimensionsMetaData)ret.getMetaData(i)).getMatrixCharacteristics(), outinfo); + MapReduceTool.writeMetaDataFile(fname + ".mtd", ValueType.DOUBLE, + ((MatrixDimensionsMetaData)ret.getMetaData(i)).getMatrixCharacteristics(), outinfo); } } } @@ -558,11 +539,4 @@ public class RunMRJobs return new JobReturn( mc, inst.getOutputInfos(), true); } - - private static JobReturn executeInMemoryTransform( MRJobInstruction inst, MatrixObject[] inputMatrices, MatrixObject[] outputMatrices) throws IOException, DMLRuntimeException, IllegalArgumentException, JSONException { - return DataTransform.cpDataTransform( - inst.getIv_shuffleInstructions(), - inputMatrices, - outputMatrices); - } } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/parser/DMLTranslator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java index a40c736..75f11b5 100644 --- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java @@ -1920,13 +1920,6 @@ public class DMLTranslator currBuiltinOp = new ReorgOp(target.getName(), target.getDataType(), target.getValueType(), ReOrgOp.SORT, inputs); break; - - case TRANSFORM: - currBuiltinOp = new ParameterizedBuiltinOp( - target.getName(), target.getDataType(), - target.getValueType(), ParamBuiltinOp.TRANSFORM, - paramHops); - break; case TRANSFORMAPPLY: currBuiltinOp = new ParameterizedBuiltinOp( http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/parser/Expression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/Expression.java b/src/main/java/org/apache/sysml/parser/Expression.java index 01a0a77..9ee3fba 100644 --- a/src/main/java/org/apache/sysml/parser/Expression.java +++ b/src/main/java/org/apache/sysml/parser/Expression.java @@ -139,7 +139,7 @@ public abstract class Expression GROUPEDAGG, RMEMPTY, REPLACE, ORDER, // Distribution Functions CDF, INVCDF, PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, PEXP, QEXP, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMENCODE, TRANSFORMMETA, + TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMENCODE, TRANSFORMMETA, TOSTRING, // The "toString" method for DML; named arguments accepted to format output INVALID }; http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java index aa888d3..4a3da28 100644 --- a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java +++ b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.HashSet; import org.apache.sysml.hops.Hop.ParamBuiltinOp; -import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.parser.LanguageException.LanguageErrorCodes; @@ -39,8 +38,6 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier public static final String TF_FN_PARAM_MTD2 = "meta"; public static final String TF_FN_PARAM_SPEC = "spec"; public static final String TF_FN_PARAM_MTD = "transformPath"; //NOTE MB: for backwards compatibility - public static final String TF_FN_PARAM_APPLYMTD = "applyTransformPath"; - public static final String TF_FN_PARAM_OUTNAMES = "outputNames"; private static HashMap<String, Expression.ParameterizedBuiltinFunctionOp> opcodeMap; static { @@ -67,7 +64,6 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier opcodeMap.put("qexp", Expression.ParameterizedBuiltinFunctionOp.QEXP); // data transformation functions - opcodeMap.put("transform", Expression.ParameterizedBuiltinFunctionOp.TRANSFORM); opcodeMap.put("transformapply", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMAPPLY); opcodeMap.put("transformdecode", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMDECODE); opcodeMap.put("transformencode", Expression.ParameterizedBuiltinFunctionOp.TRANSFORMENCODE); @@ -222,15 +218,11 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier case ORDER: validateOrder(output, conditional); break; - - case TRANSFORM: - validateTransform(output, conditional); - break; case TRANSFORMAPPLY: validateTransformApply(output, conditional); break; - + case TRANSFORMDECODE: validateTransformDecode(output, conditional); break; @@ -289,64 +281,6 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier return; } - // example: A = transform(data=D, txmtd="", txspec="") - private void validateTransform(DataIdentifier output, boolean conditional) - throws LanguageException - { - //validate data - checkDataType("transform", TF_FN_PARAM_DATA, DataType.FRAME, conditional); - - Expression txmtd = getVarParam(TF_FN_PARAM_MTD); - if( txmtd==null ) { - raiseValidateError("Named parameter '" + TF_FN_PARAM_MTD + "' missing. Please specify the transformation metadata file path.", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - else if( txmtd.getOutput().getDataType() != DataType.SCALAR || txmtd.getOutput().getValueType() != ValueType.STRING ){ - raiseValidateError("Transformation metadata file '" + TF_FN_PARAM_MTD + "' must be a string value (a scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - - Expression txspec = getVarParam(TF_FN_PARAM_SPEC); - Expression applyMTD = getVarParam(TF_FN_PARAM_APPLYMTD); - if( txspec==null ) { - if ( applyMTD == null ) - raiseValidateError("Named parameter '" + TF_FN_PARAM_SPEC + "' missing. Please specify the transformation specification (JSON string).", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - else if( txspec.getOutput().getDataType() != DataType.SCALAR || txspec.getOutput().getValueType() != ValueType.STRING ){ - raiseValidateError("Transformation specification '" + TF_FN_PARAM_SPEC + "' must be a string value (a scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - - if ( applyMTD != null ) { - if( applyMTD.getOutput().getDataType() != DataType.SCALAR || applyMTD.getOutput().getValueType() != ValueType.STRING ){ - raiseValidateError("Apply transformation metadata file'" + TF_FN_PARAM_APPLYMTD + "' must be a string value (a scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - - //NOTE: txspec can still be optionally specified; if specified it takes precedence over - // specification persisted in txmtd during transform. - } - - Expression outNames = getVarParam(TF_FN_PARAM_OUTNAMES); - if ( outNames != null ) { - if( outNames.getOutput().getDataType() != DataType.SCALAR || outNames.getOutput().getValueType() != ValueType.STRING ) - raiseValidateError("The parameter specifying column names in the output file '" + TF_FN_PARAM_MTD + "' must be a string value (a scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - if ( applyMTD != null) - raiseValidateError("Only one of '" + TF_FN_PARAM_APPLYMTD + "' or '" + TF_FN_PARAM_OUTNAMES + "' can be specified in transform().", conditional, LanguageErrorCodes.INVALID_PARAMETERS); - } - - // disable frame csv reblocks as transform operates directly over csv files - // (this is required to support both file-based transform and frame-based - // transform at the same time; hence, transform and frame-based transform - // functions over csv cannot be used in the same script; accordingly we - // give an appropriate warning) - OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = false; - raiseValidateError("Disable frame csv reblock to support file-based transform.", true); - - // Output is a matrix with same dims as input - output.setDataType(DataType.MATRIX); - output.setFormatType(FormatType.CSV); - output.setValueType(ValueType.DOUBLE); - // Output dimensions may not be known at compile time, for example when dummycoding. - output.setDimensions(-1, -1); - } - // example: A = transformapply(target=X, meta=M, spec=s) private void validateTransformApply(DataIdentifier output, boolean conditional) throws LanguageException http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java index eb69068..8635833 100644 --- a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java +++ b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java @@ -46,7 +46,7 @@ public class ParameterizedBuiltin extends ValueFunction public enum ParameterizedBuiltinCode { CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, - TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE }; + TRANSFORMAPPLY, TRANSFORMDECODE }; public enum ProbabilityDistributionCode { INVALID, NORMAL, EXP, CHISQ, F, T }; @@ -62,7 +62,6 @@ public class ParameterizedBuiltin extends ValueFunction String2ParameterizedBuiltinCode.put( "rmempty", ParameterizedBuiltinCode.RMEMPTY); String2ParameterizedBuiltinCode.put( "replace", ParameterizedBuiltinCode.REPLACE); String2ParameterizedBuiltinCode.put( "rexpand", ParameterizedBuiltinCode.REXPAND); - String2ParameterizedBuiltinCode.put( "transform", ParameterizedBuiltinCode.TRANSFORM); String2ParameterizedBuiltinCode.put( "transformapply", ParameterizedBuiltinCode.TRANSFORMAPPLY); String2ParameterizedBuiltinCode.put( "transformdecode", ParameterizedBuiltinCode.TRANSFORMDECODE); } @@ -169,9 +168,6 @@ public class ParameterizedBuiltin extends ValueFunction case REXPAND: return new ParameterizedBuiltin(ParameterizedBuiltinCode.REXPAND); - case TRANSFORM: - return new ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORM); - case TRANSFORMAPPLY: return new ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMAPPLY); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java index 52c11e6..9126ab7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java @@ -191,7 +191,6 @@ public class CPInstructionParser extends InstructionParser String2CPInstructionType.put( "rmempty" , CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "replace" , CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "rexpand" , CPINSTRUCTION_TYPE.ParameterizedBuiltin); - String2CPInstructionType.put( "transform" , CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "transformapply",CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "transformdecode",CPINSTRUCTION_TYPE.ParameterizedBuiltin); String2CPInstructionType.put( "transformencode",CPINSTRUCTION_TYPE.MultiReturnParameterizedBuiltin); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java index 0b9cb7d..5ced2ab 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java @@ -288,7 +288,6 @@ public class MRInstructionParser extends InstructionParser //dummy (pseudo instructions) String2MRInstructionType.put( "sort", MRINSTRUCTION_TYPE.Sort); String2MRInstructionType.put( "csvwrite", MRINSTRUCTION_TYPE.CSVWrite); - String2MRInstructionType.put( "transform", MRINSTRUCTION_TYPE.Transform); //parameterized builtins String2MRInstructionType.put( "replace", MRINSTRUCTION_TYPE.ParameterizedBuiltin); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index 5ca3847..a2b0ef1 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -230,7 +230,6 @@ public class SPInstructionParser extends InstructionParser String2SPInstructionType.put( "rmempty" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "replace" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "rexpand" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); - String2SPInstructionType.put( "transform" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "transformapply",SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "transformdecode",SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "transformencode",SPINSTRUCTION_TYPE.MultiReturnBuiltin); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java index a31fe94..847d5f9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java @@ -33,12 +33,10 @@ import org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin; import org.apache.sysml.runtime.functionobjects.ValueFunction; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction; -import org.apache.sysml.runtime.matrix.JobReturn; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.SimpleOperator; -import org.apache.sysml.runtime.transform.DataTransform; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.decode.Decoder; import org.apache.sysml.runtime.transform.decode.DecoderFactory; @@ -141,8 +139,7 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode); return new ParameterizedBuiltinCPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str); } - else if ( opcode.equals("transform") - || opcode.equals("transformapply") + else if ( opcode.equals("transformapply") || opcode.equals("transformdecode") || opcode.equals("transformmeta")) { @@ -255,16 +252,6 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction ec.setMatrixOutput(output.getName(), ret); ec.releaseMatrixInput(params.get("target")); } - else if ( opcode.equalsIgnoreCase("transform")) { - FrameObject fo = ec.getFrameObject(params.get("target")); - MatrixObject out = ec.getMatrixObject(output.getName()); - try { - JobReturn jt = DataTransform.cpDataTransform(this, new FrameObject[] { fo } , new MatrixObject[] {out} ); - out.updateMatrixCharacteristics(jt.getMatrixCharacteristics(0)); - } catch (Exception e) { - throw new DMLRuntimeException(e); - } - } else if ( opcode.equalsIgnoreCase("transformapply")) { //acquire locks FrameBlock data = ec.getFrameInput(params.get("target")); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java index 68132fc..ad98ad7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java @@ -32,7 +32,7 @@ public abstract class MRInstruction extends Instruction { public enum MRINSTRUCTION_TYPE { INVALID, Append, Aggregate, ArithmeticBinary, ArithmeticBinary2, AggregateBinary, AggregateUnary, - Rand, Seq, CSVReblock, CSVWrite, Transform, + Rand, Seq, CSVReblock, CSVWrite, Reblock, Reorg, Replicate, Unary, CombineBinary, CombineUnary, CombineTernary, PickByCount, Partition, Ternary, Quaternary, CM_N_COV, MapGroupedAggregate, GroupedAggregate, RangeReIndex, ZeroOut, MMTSJ, PMMJ, MatrixReshape, ParameterizedBuiltin, Sort, MapMultChain, CumsumAggregate, CumsumSplit, CumsumOffset, BinUaggChain, UaggOuterChain, RemoveEmpty}; http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java index 5890bf9..254c3d7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java @@ -56,12 +56,12 @@ import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.operators.Operator; -import org.apache.sysml.runtime.transform.MVImputeAgent; -import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod; -import org.apache.sysml.runtime.transform.RecodeAgent; import org.apache.sysml.runtime.transform.encode.Encoder; import org.apache.sysml.runtime.transform.encode.EncoderComposite; import org.apache.sysml.runtime.transform.encode.EncoderFactory; +import org.apache.sysml.runtime.transform.encode.EncoderMVImpute; +import org.apache.sysml.runtime.transform.encode.EncoderRecode; +import org.apache.sysml.runtime.transform.encode.EncoderMVImpute.MVMethod; import org.apache.sysml.runtime.transform.meta.TfMetaUtils; import org.apache.sysml.runtime.transform.meta.TfOffsetMap; @@ -129,7 +129,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI .distinct().groupByKey() .flatMap(new TransformEncodeGroupFunction(accMax)); if( containsMVImputeEncoder(encoderBuild) ) { - MVImputeAgent mva = getMVImputeEncoder(encoderBuild); + EncoderMVImpute mva = getMVImputeEncoder(encoderBuild); rcMaps = rcMaps.union( in.mapPartitionsToPair(new TransformEncodeBuild2Function(mva)) .groupByKey().flatMap(new TransformEncodeGroup2Function(mva)) ); @@ -149,7 +149,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI omap = new TfOffsetMap(SparkUtils.toIndexedLong(in.mapToPair( new RDDTransformApplyOffsetFunction(spec, colnames)).collect())); } - + //create encoder broadcast (avoiding replication per task) Encoder encoder = EncoderFactory.createEncoder(spec, colnames, fo.getSchema(), (int)fo.getNumColumns(), meta); @@ -176,16 +176,16 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI private boolean containsMVImputeEncoder(Encoder encoder) { if( encoder instanceof EncoderComposite ) for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() ) - if( cencoder instanceof MVImputeAgent ) + if( cencoder instanceof EncoderMVImpute ) return true; return false; } - private MVImputeAgent getMVImputeEncoder(Encoder encoder) { + private EncoderMVImpute getMVImputeEncoder(Encoder encoder) { if( encoder instanceof EncoderComposite ) for( Encoder cencoder : ((EncoderComposite)encoder).getEncoders() ) - if( cencoder instanceof MVImputeAgent ) - return (MVImputeAgent) cencoder; + if( cencoder instanceof EncoderMVImpute ) + return (EncoderMVImpute) cencoder; return null; } @@ -248,12 +248,12 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI { private static final long serialVersionUID = 6336375833412029279L; - private RecodeAgent _raEncoder = null; + private EncoderRecode _raEncoder = null; public TransformEncodeBuildFunction(Encoder encoder) { for( Encoder cEncoder : ((EncoderComposite)encoder).getEncoders() ) - if( cEncoder instanceof RecodeAgent ) - _raEncoder = (RecodeAgent)cEncoder; + if( cEncoder instanceof EncoderRecode ) + _raEncoder = (EncoderRecode)cEncoder; } @Override @@ -310,7 +310,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI sb.append(' '); sb.append(colID); sb.append(' '); - sb.append(RecodeAgent.constructRecodeMapEntry( + sb.append(EncoderRecode.constructRecodeMapEntry( iter.next().toString(), rowID)); ret.add(sb.toString()); sb.setLength(0); @@ -326,9 +326,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI { private static final long serialVersionUID = 6336375833412029279L; - private MVImputeAgent _encoder = null; + private EncoderMVImpute _encoder = null; - public TransformEncodeBuild2Function(MVImputeAgent encoder) { + public TransformEncodeBuild2Function(EncoderMVImpute encoder) { _encoder = encoder; } @@ -370,9 +370,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI { private static final long serialVersionUID = 702100641492347459L; - private MVImputeAgent _encoder = null; + private EncoderMVImpute _encoder = null; - public TransformEncodeGroup2Function(MVImputeAgent encoder) { + public TransformEncodeGroup2Function(EncoderMVImpute encoder) { _encoder = encoder; } http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index 2a80cfc..179ef9e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -38,7 +38,6 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.parser.Statement; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -73,7 +72,6 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator; import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes; import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.SimpleOperator; -import org.apache.sysml.runtime.transform.DataTransform; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.transform.decode.Decoder; import org.apache.sysml.runtime.transform.decode.DecoderFactory; @@ -173,7 +171,6 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } else if( opcode.equalsIgnoreCase("rexpand") || opcode.equalsIgnoreCase("replace") - || opcode.equalsIgnoreCase("transform") || opcode.equalsIgnoreCase("transformapply") || opcode.equalsIgnoreCase("transformdecode")) { @@ -432,17 +429,6 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); mcOut.set(dirRows?lmaxVal:mcIn.getRows(), dirRows?mcIn.getRows():lmaxVal, (int)brlen, (int)bclen, -1); } - else if ( opcode.equalsIgnoreCase("transform") ) - { - // perform data transform on Spark - try { - DataTransform.spDataTransform( this, - new FrameObject[] { sec.getFrameObject(params.get("target")) }, - new MatrixObject[] { sec.getMatrixObject(output.getName()) }, ec); - } catch (Exception e) { - throw new DMLRuntimeException(e); - } - } else if ( opcode.equalsIgnoreCase("transformapply") ) { //get input RDD and meta data http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index c30c85f..8bfee87 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -56,10 +56,6 @@ public class WriteSPInstruction extends SPInstruction private CPOperand input3 = null; private CPOperand input4 = null; private FileFormatProperties formatProperties; - - //scalars might occur for transform - // TODO remove once transform over frames supported - private boolean isInputMatrixBlock = true; public WriteSPInstruction(CPOperand in1, CPOperand in2, CPOperand in3, String opcode, String str) { super(opcode, str); @@ -100,10 +96,6 @@ public class WriteSPInstruction extends SPInstruction boolean sparse = Boolean.parseBoolean(parts[6]); FileFormatProperties formatProperties = new CSVFileFormatProperties(hasHeader, delim, sparse); inst.setFormatProperties(formatProperties); - - boolean isInputMB = Boolean.parseBoolean(parts[7]); - inst.setInputMatrixBlock(isInputMB); - CPOperand in4 = new CPOperand(parts[8]); inst.input4 = in4; } else { @@ -125,14 +117,6 @@ public class WriteSPInstruction extends SPInstruction formatProperties = prop; } - public void setInputMatrixBlock(boolean isMB) { - isInputMatrixBlock = isMB; - } - - public boolean isInputMatrixBlock() { - return isInputMatrixBlock; - } - @Override public void processInstruction(ExecutionContext ec) throws DMLRuntimeException @@ -180,7 +164,7 @@ public class WriteSPInstruction extends SPInstruction { //piggyback nnz maintenance on write LongAccumulator aNnz = null; - if ( isInputMatrixBlock && !mc.nnzKnown() ) { + if( !mc.nnzKnown() ) { aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); } @@ -202,57 +186,25 @@ public class WriteSPInstruction extends SPInstruction else customSaveTextFile(ijv, fname, false); - if ( isInputMatrixBlock && !mc.nnzKnown() ) + if( !mc.nnzKnown() ) mc.setNonZeros( aNnz.value() ); } else if( oi == OutputInfo.CSVOutputInfo ) { - JavaRDD<String> out = null; LongAccumulator aNnz = null; - if ( isInputMatrixBlock ) { - //piggyback nnz computation on actual write - if( !mc.nnzKnown() ) { - aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); - in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); - } - - out = RDDConverterUtils.binaryBlockToCsv(in1, mc, - (CSVFileFormatProperties) formatProperties, true); - } - else - { - // This case is applicable when the CSV output from transform() is written out - // TODO remove once transform over frames supported - @SuppressWarnings("unchecked") - JavaPairRDD<Long,String> rdd = (JavaPairRDD<Long, String>) (sec.getMatrixObject(input1.getName())).getRDDHandle().getRDD(); - out = rdd.values(); - - String sep = ","; - boolean hasHeader = false; - if(formatProperties != null) { - sep = ((CSVFileFormatProperties) formatProperties).getDelim(); - hasHeader = ((CSVFileFormatProperties) formatProperties).hasHeader(); - } - - if(hasHeader) { - StringBuffer buf = new StringBuffer(); - for(int j = 1; j < mc.getCols(); j++) { - if(j != 1) { - buf.append(sep); - } - buf.append("C" + j); - } - ArrayList<String> headerContainer = new ArrayList<String>(1); - headerContainer.add(0, buf.toString()); - JavaRDD<String> header = sec.getSparkContext().parallelize(headerContainer); - out = header.union(out); - } - } + //piggyback nnz computation on actual write + if( !mc.nnzKnown() ) { + aNnz = sec.getSparkContext().sc().longAccumulator("nnz"); + in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); + } + JavaRDD<String> out = RDDConverterUtils.binaryBlockToCsv( + in1, mc, (CSVFileFormatProperties) formatProperties, true); + customSaveTextFile(out, fname, false); - if( isInputMatrixBlock && !mc.nnzKnown() ) + if( !mc.nnzKnown() ) mc.setNonZeros((long)aNnz.value().longValue()); } else if( oi == OutputInfo.BinaryBlockOutputInfo ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java index b976545..3bd9758 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java @@ -55,7 +55,6 @@ import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups; -import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.MapReduceTool; @@ -281,17 +280,9 @@ public class CSVReblockMR otherInstructionsInReducer, numReducers, replication, resultIndexes, outputs, outputInfos, ret1.counterFile, smallestFiles); return ret; } - - public static AssignRowIDMRReturn runAssignRowIDMRJob(String[] inputs, InputInfo[] inputInfos, int[] brlens, int[] bclens, - String reblockInstructions, int replication, String[] smallestFiles) - throws Exception - { - return runAssignRowIDMRJob(inputs, inputInfos, brlens, bclens, reblockInstructions, replication, smallestFiles, false, null, null); - } - public static AssignRowIDMRReturn runAssignRowIDMRJob(String[] inputs, InputInfo[] inputInfos, int[] brlens, int[] bclens, - String reblockInstructions, int replication, String[] smallestFiles, boolean transform, String naStrings, String spec) + String reblockInstructions, int replication, String[] smallestFiles) throws Exception { AssignRowIDMRReturn ret=new AssignRowIDMRReturn(); @@ -347,16 +338,6 @@ public class CSVReblockMR job.setOutputKeyClass(ByteWritable.class); job.setOutputValueClass(OffsetCount.class); - // setup properties relevant to transform - job.setBoolean(MRJobConfiguration.TF_TRANSFORM, transform); - if (transform) - { - if ( naStrings != null) - // Adding "dummy" string to handle the case of na_strings = "" - job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(naStrings) ); - job.set(MRJobConfiguration.TF_SPEC, spec); - } - RunningJob runjob=JobClient.runJob(job); /* Process different counters */ http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java index 85b55cb..4f9877b 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java @@ -36,7 +36,6 @@ import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; -import org.apache.sysml.runtime.transform.TfUtils; public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWritable, Text, ByteWritable, OffsetCount> { @@ -51,9 +50,6 @@ public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWr private String filename = ""; private boolean headerFile = false; - // members relevant to transform - private TfUtils _agents = null; - @Override public void map(LongWritable key, Text value, OutputCollector<ByteWritable, OffsetCount> out, Reporter report) @@ -69,7 +65,7 @@ public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWr if(key.get()==0 && headerFile) { if(!ignoreFirstLine) { report.incrCounter(CSVReblockMR.NUM_COLS_IN_MATRIX, outKey.toString(), value.toString().split(delim, -1).length); - num += omit(value.toString()) ? 0 : 1; + num++; } else realFirstLine = true; @@ -79,7 +75,7 @@ public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWr report.incrCounter(CSVReblockMR.NUM_COLS_IN_MATRIX, outKey.toString(), value.toString().split(delim, -1).length); realFirstLine = false; } - num += omit(value.toString()) ? 0 : 1; + num++; } } @@ -107,23 +103,12 @@ public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWr ignoreFirstLine = ins.hasHeader; break; } - - // load properties relevant to transform - boolean omit = job.getBoolean(MRJobConfiguration.TF_TRANSFORM, false); - if ( omit ) - _agents = new TfUtils(job, true); } catch(Exception e) { throw new RuntimeException(e); } } - private boolean omit(String line) { - if(_agents == null) - return false; - return _agents.omit( line.split(delim, -1) ); - } - @Override public void close() throws IOException { if( outCache != null ) //robustness empty splits http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java index 50a7412..88512b6 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java @@ -231,33 +231,6 @@ public class MRJobConfiguration */ public static final String NUM_NONZERO_CELLS="nonzeros"; - public static final String TF_NUM_COLS = "transform.num.columns"; - public static final String TF_HAS_HEADER = "transform.has.header"; - public static final String TF_DELIM = "transform.field.delimiter"; - public static final String TF_NA_STRINGS = "transform.na.strings"; - public static final String TF_HEADER = "transform.header.line"; - public static final String TF_SPEC = "transform.specification"; - public static final String TF_TMP_LOC = "transform.temp.location"; - public static final String TF_TRANSFORM = "transform.omit.na.rows"; - - public static final String TF_SMALLEST_FILE= "transform.smallest.file"; - public static final String TF_OFFSETS_FILE = "transform.offsets.file"; - public static final String TF_TXMTD_PATH = "transform.txmtd.path"; - - /*public static enum DataTransformJobProperty - { - RCD_NUM_COLS("recode.num.columns"); - - private final String name; - private DataTransformJobProperty(String n) { - name = n; - } - }*/ - - public static enum DataTransformCounters { - TRANSFORMED_NUM_ROWS - }; - public static final int getMiscMemRequired(JobConf job) { return job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096); http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java deleted file mode 100644 index e932d1e..0000000 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.transform; -import java.util.HashSet; - -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.Counters.Group; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.runtime.instructions.InstructionParser; -import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; -import org.apache.sysml.runtime.matrix.CSVReblockMR; -import org.apache.sysml.runtime.matrix.CSVReblockMR.BlockRow; -import org.apache.sysml.runtime.matrix.JobReturn; -import org.apache.sysml.runtime.matrix.WriteCSVMR; -import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; -import org.apache.sysml.runtime.matrix.data.InputInfo; -import org.apache.sysml.runtime.matrix.data.OutputInfo; -import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes; -import org.apache.sysml.runtime.matrix.mapred.CSVReblockReducer; -import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups; -import org.apache.sysml.runtime.util.MapReduceTool; - -/** - * MapReduce job that performs the actual data transformations, such as recoding - * and binning. In contrast to ApplyTxCSVMR, this job generates the output in - * BinaryBlock format. This job takes a data set as well as the transformation - * metadata (which, for example, computed from GenTxMtdMR) as inputs. - * - */ - -@SuppressWarnings("deprecation") -public class ApplyTfBBMR { - - public static JobReturn runJob(String inputPath, String rblkInst, String otherInst, String spec, String mapsPath, String tmpPath, String outputPath, String partOffsetsFile, CSVFileFormatProperties inputDataProperties, long numRows, long numColsBefore, long numColsAfter, int replication, String headerLine) throws Exception { - - CSVReblockInstruction rblk = (CSVReblockInstruction) InstructionParser.parseSingleInstruction(rblkInst); - - long[] rlens = new long[]{numRows}; - long[] clens = new long[]{numColsAfter}; - int[] brlens = new int[]{rblk.brlen}; - int[] bclens = new int[]{rblk.bclen}; - byte[] realIndexes = new byte[]{rblk.input}; - byte[] resultIndexes = new byte[]{rblk.output}; - - JobConf job = new JobConf(ApplyTfBBMR.class); - job.setJobName("ApplyTfBB"); - - /* Setup MapReduce Job */ - job.setJarByClass(ApplyTfBBMR.class); - - // set relevant classes - job.setMapperClass(ApplyTfBBMapper.class); - - MRJobConfiguration.setUpMultipleInputs(job, realIndexes, new String[]{inputPath}, new InputInfo[]{InputInfo.CSVInputInfo}, brlens, bclens, false, ConvertTarget.CELL); - - MRJobConfiguration.setMatricesDimensions(job, realIndexes, rlens, clens); - MRJobConfiguration.setBlocksSizes(job, realIndexes, brlens, bclens); - - MRJobConfiguration.setCSVReblockInstructions(job, rblkInst); - - //set up the instructions that will happen in the reducer, after the aggregation instrucions - MRJobConfiguration.setInstructionsInReducer(job, otherInst); - - job.setInt(MRConfigurationNames.DFS_REPLICATION, replication); - - //set up preferred custom serialization framework for binary block format - if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) - MRJobConfiguration.addBinaryBlockSerializationFramework( job ); - - //set up what matrices are needed to pass from the mapper to reducer - HashSet<Byte> mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, realIndexes, null, - rblkInst, null, otherInst, resultIndexes); - - MatrixChar_N_ReducerGroups ret=MRJobConfiguration.computeMatrixCharacteristics(job, realIndexes, - null, rblkInst, null, null, null, resultIndexes, mapoutputIndexes, false); - - //set up the number of reducers - int numRed = WriteCSVMR.determineNumReducers(rlens, clens, ConfigurationManager.getNumReducers(), ret.numReducerGroups); - job.setNumReduceTasks( numRed ); - - //set up the multiple output files, and their format information - MRJobConfiguration.setUpMultipleOutputs(job, new byte[]{rblk.output}, new byte[]{0}, new String[]{outputPath}, new OutputInfo[]{OutputInfo.BinaryBlockOutputInfo}, true, false); - - // configure mapper and the mapper output key value pairs - job.setMapperClass(ApplyTfBBMapper.class); - job.setMapOutputKeyClass(TaggedFirstSecondIndexes.class); - job.setMapOutputValueClass(BlockRow.class); - - //configure reducer - job.setReducerClass(CSVReblockReducer.class); - - //turn off adaptivemr - job.setBoolean("adaptivemr.map.enable", false); - - //set unique working dir - MRJobConfiguration.setUniqueWorkingDir(job); - - // Add transformation metadata file as well as partOffsetsFile to Distributed cache - DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), job); - DistributedCache.createSymlink(job); - - Path cachefile=new Path(new Path(partOffsetsFile), "part-00000"); - DistributedCache.addCacheFile(cachefile.toUri(), job); - DistributedCache.createSymlink(job); - - job.set(MRJobConfiguration.TF_HAS_HEADER, Boolean.toString(inputDataProperties.hasHeader())); - job.set(MRJobConfiguration.TF_DELIM, inputDataProperties.getDelim()); - // Adding "dummy" string to handle the case of na_strings = "" - if( inputDataProperties.getNAStrings() != null ) - job.set(MRJobConfiguration.TF_NA_STRINGS, TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) ); - job.set(MRJobConfiguration.TF_SPEC, spec); - job.set(MRJobConfiguration.TF_SMALLEST_FILE, CSVReblockMR.findSmallestFile(job, inputPath)); - job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, outputPath); - job.setLong(MRJobConfiguration.TF_NUM_COLS, numColsBefore); - job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath); - job.set(MRJobConfiguration.TF_HEADER, headerLine); - job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString()); - job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath); - - RunningJob runjob=JobClient.runJob(job); - - MapReduceTool.deleteFileIfExistOnHDFS(cachefile, job); - - Group group=runjob.getCounters().getGroup(MRJobConfiguration.NUM_NONZERO_CELLS); - for(int i=0; i<resultIndexes.length; i++) { - ret.stats[i].setNonZeros(group.getCounter(Integer.toString(i))); - } - return new JobReturn(ret.stats, runjob.isSuccessful()); - } - -}
