[SYSTEMML-1300] Remove file-based transform from compiler/runtime

This patch removes the old (deprecated) file-based transform in favor of
the new frame-based transform. In detail, this includes the removal of
existing cp/sp/mr file-based implementations, the entire transform
language/compiler/runtime integration, and transform-specific tests as
well as a refactoring of existing encoders and various cleanups.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0cd3905f
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0cd3905f
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0cd3905f

Branch: refs/heads/master
Commit: 0cd3905f592b5ee0c867ebc952d9ba367fb8a0c9
Parents: 8a27573
Author: Matthias Boehm <[email protected]>
Authored: Wed Jun 7 22:12:01 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jun 8 12:24:12 2017 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java    |   11 +-
 .../org/apache/sysml/hops/OptimizerUtils.java   |   10 -
 .../sysml/hops/ParameterizedBuiltinOp.java      |   20 -
 .../apache/sysml/hops/recompile/Recompiler.java |   21 -
 .../sysml/hops/rewrite/HopRewriteUtils.java     |   17 -
 .../rewrite/RewriteBlockSizeAndReblock.java     |   26 +-
 .../RewriteInjectSparkPReadCheckpointing.java   |    5 +-
 .../rewrite/RewriteSplitDagUnknownCSVRead.java  |    3 +-
 .../java/org/apache/sysml/lops/CSVReBlock.java  |   41 +-
 src/main/java/org/apache/sysml/lops/Data.java   |   25 +-
 .../apache/sysml/lops/ParameterizedBuiltin.java |   64 +-
 src/main/java/org/apache/sysml/lops/Unary.java  |   10 +-
 .../java/org/apache/sysml/lops/compile/Dag.java |   58 +-
 .../org/apache/sysml/lops/compile/JobType.java  |   18 +-
 .../apache/sysml/lops/runtime/RunMRJobs.java    |   34 +-
 .../org/apache/sysml/parser/DMLTranslator.java  |    7 -
 .../org/apache/sysml/parser/Expression.java     |    2 +-
 .../ParameterizedBuiltinFunctionExpression.java |   68 +-
 .../functionobjects/ParameterizedBuiltin.java   |    6 +-
 .../instructions/CPInstructionParser.java       |    1 -
 .../instructions/MRInstructionParser.java       |    1 -
 .../instructions/SPInstructionParser.java       |    1 -
 .../cp/ParameterizedBuiltinCPInstruction.java   |   15 +-
 .../runtime/instructions/mr/MRInstruction.java  |    2 +-
 ...ReturnParameterizedBuiltinSPInstruction.java |   34 +-
 .../ParameterizedBuiltinSPInstruction.java      |   14 -
 .../instructions/spark/WriteSPInstruction.java  |   70 +-
 .../sysml/runtime/matrix/CSVReblockMR.java      |   21 +-
 .../matrix/mapred/CSVAssignRowIDMapper.java     |   19 +-
 .../matrix/mapred/MRJobConfiguration.java       |   27 -
 .../sysml/runtime/transform/ApplyTfBBMR.java    |  155 --
 .../runtime/transform/ApplyTfBBMapper.java      |  157 --
 .../sysml/runtime/transform/ApplyTfCSVMR.java   |  129 --
 .../runtime/transform/ApplyTfCSVMapper.java     |  113 --
 .../runtime/transform/ApplyTfCSVSPARK.java      |  164 --
 .../sysml/runtime/transform/BinAgent.java       |  382 -----
 .../sysml/runtime/transform/DataTransform.java  | 1496 ------------------
 .../sysml/runtime/transform/DistinctValue.java  |  105 --
 .../sysml/runtime/transform/DummycodeAgent.java |  461 ------
 .../sysml/runtime/transform/GTFMTDMapper.java   |  111 --
 .../sysml/runtime/transform/GTFMTDReducer.java  |  127 --
 .../sysml/runtime/transform/GenTfMtdMR.java     |  105 --
 .../sysml/runtime/transform/GenTfMtdSPARK.java  |  240 ---
 .../sysml/runtime/transform/MVImputeAgent.java  | 1046 ------------
 .../sysml/runtime/transform/OmitAgent.java      |  148 --
 .../sysml/runtime/transform/RecodeAgent.java    |  534 -------
 .../apache/sysml/runtime/transform/TfUtils.java |  446 +-----
 .../sysml/runtime/transform/encode/Encoder.java |   17 -
 .../runtime/transform/encode/EncoderBin.java    |  188 +++
 .../transform/encode/EncoderComposite.java      |   25 -
 .../transform/encode/EncoderDummycode.java      |  139 ++
 .../transform/encode/EncoderFactory.java        |   16 +-
 .../transform/encode/EncoderMVImpute.java       |  422 +++++
 .../runtime/transform/encode/EncoderOmit.java   |  123 ++
 .../transform/encode/EncoderPassThrough.java    |   25 -
 .../runtime/transform/encode/EncoderRecode.java |  253 +++
 .../runtime/transform/meta/TfMetaUtils.java     |    1 -
 .../functions/frame/FrameFunctionTest.java      |    3 -
 .../functions/frame/FrameMatrixReblockTest.java |    6 -
 .../functions/frame/FrameMetaReadWriteTest.java |    6 -
 .../transform/FrameCSVReadWriteTest.java        |    4 -
 .../functions/transform/RunTest.java            |  268 ----
 .../functions/transform/ScalingTest.java        |  244 ---
 .../transform/TransformAndApplyTest.java        |  143 --
 .../TransformCSVFrameEncodeDecodeTest.java      |    6 +-
 .../TransformCSVFrameEncodeReadTest.java        |    6 +-
 .../transform/TransformFrameApplyTest.java      |  194 ---
 .../TransformFrameEncodeApplyTest.java          |    4 -
 .../TransformFrameEncodeDecodeTest.java         |    4 -
 .../TransformFrameEncodeDecodeTokenTest.java    |    4 -
 .../transform/TransformReadMetaTest.java        |  205 ---
 .../functions/transform/TransformTest.java      |  709 ---------
 src/test/scripts/functions/transform/Apply.dml  |   30 -
 .../scripts/functions/transform/ApplyFrame.dml  |   29 -
 src/test/scripts/functions/transform/Scaling.R  |   36 -
 .../scripts/functions/transform/Scaling.dml     |   31 -
 .../scripts/functions/transform/Transform.dml   |   31 -
 .../functions/transform/TransformAndApply.dml   |   37 -
 .../transform/TransformAndApplySpecX.json       |    5 -
 .../transform/TransformAndApplySpecY.json       |    5 -
 .../functions/transform/TransformReadMeta.dml   |   33 -
 .../functions/transform/TransformReadMeta2.dml  |   33 -
 .../transform/TransformReadMetaSpecX.json       |    5 -
 .../functions/transform/Transform_colnames.dml  |   32 -
 .../functions/transform/ZPackageSuite.java      |    6 -
 85 files changed, 1209 insertions(+), 8689 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index 6c7089b..31e3aa6 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -288,11 +288,9 @@ public abstract class Hop
                        
                        try
                        {
-                               if(    (this instanceof DataOp  // CSV
-                                                       && 
((DataOp)this).getDataOpType() == DataOpTypes.PERSISTENTREAD
-                                                       && 
((DataOp)this).getInputFormatType() == FileFormatTypes.CSV ) 
-                                       || (this instanceof 
ParameterizedBuiltinOp 
-                                                       && 
((ParameterizedBuiltinOp)this).getOp() == ParamBuiltinOp.TRANSFORM) )
+                               if( this instanceof DataOp  // CSV
+                                       && ((DataOp)this).getDataOpType() == 
DataOpTypes.PERSISTENTREAD
+                                       && ((DataOp)this).getInputFormatType() 
== FileFormatTypes.CSV  )
                                {
                                        reblock = new CSVReBlock( input, 
getRowsInBlock(), getColsInBlock(), 
                                                        getDataType(), 
getValueType(), et);
@@ -1038,7 +1036,7 @@ public abstract class Hop
 
        public enum ParamBuiltinOp {
                INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, 
-               TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA,
+               TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA,
                TOSTRING
        };
 
@@ -1298,7 +1296,6 @@ public abstract class Hop
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.RMEMPTY, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.RMEMPTY);
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REPLACE, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REPLACE);
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REXPAND, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REXPAND);
-               HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORM, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM);
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMAPPLY, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMAPPLY);      
        
                
HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMDECODE, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMDECODE);
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMMETA, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMMETA);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 0c1c838..a40e36c 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -137,11 +137,6 @@ public class OptimizerUtils
        public static boolean ALLOW_WORSTCASE_SIZE_EXPRESSION_EVALUATION = true;
 
        public static boolean ALLOW_RAND_JOB_RECOMPILE = true;
-       
-       /**
-        * Enables CP-side data transformation for small files.
-        */
-       public static boolean ALLOW_TRANSFORM_RECOMPILE = true;
 
        /**
         * Enables parfor runtime piggybacking of MR jobs into the packed jobs 
for
@@ -205,11 +200,6 @@ public class OptimizerUtils
         */
        public static final boolean ALLOW_COMBINE_FILE_INPUT_FORMAT = true;
        
-       /**
-        * Enables automatic csv-binary block reblock.
-        */
-       public static boolean ALLOW_FRAME_CSV_REBLOCK = true;
-       
        
        public static long GPU_MEMORY_BUDGET = -1;
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 74542f4..7bff4bd 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -35,7 +35,6 @@ import org.apache.sysml.lops.GroupedAggregateM;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.lops.LopsException;
-import org.apache.sysml.lops.OutputParameters.Format;
 import org.apache.sysml.lops.PMMJ;
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.lops.ParameterizedBuiltin;
@@ -201,20 +200,6 @@ public class ParameterizedBuiltinOp extends Hop implements 
MultiThreadedHop
                                constructLopsRExpand(inputlops, et);
                                break;
                        } 
-                       case TRANSFORM: {
-                               ExecType et = optFindExecType();
-                               
-                               ParameterizedBuiltin pbilop = new 
ParameterizedBuiltin(inputlops,
-                                               
HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et);
-                               setOutputDimensions(pbilop);
-                               setLineNumbers(pbilop);
-                               // output of transform is always in CSV format
-                               // to produce a blocked output, this lop must 
be 
-                               // fed into CSV Reblock lop.
-                               
pbilop.getOutputParameters().setFormat(Format.CSV);
-                               setLops(pbilop);
-                               break;
-                       }
                        case CDF:
                        case INVCDF: 
                        case REPLACE:
@@ -1084,11 +1069,6 @@ public class ParameterizedBuiltinOp extends Hop 
implements MultiThreadedHop
                }
                else 
                {
-                       if( _op == ParamBuiltinOp.TRANSFORM ) {
-                               // force remote, at runtime cp transform 
triggered for small files.
-                               return (_etype = REMOTE);
-                       }
-                       
                        if ( OptimizerUtils.isMemoryBasedOptLevel() ) {
                                _etype = findExecTypeByMemEstimate();
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index 9942035..c92b735 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -125,7 +125,6 @@ public class Recompiler
        //note that we scale this threshold up by the degree of available 
parallelism
        private static final long CP_REBLOCK_THRESHOLD_SIZE = 
(long)1024*1024*1024; 
        private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = 
(long)256*1024*1024;
-       private static final long CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE = 
(long)1024*1024*1024;
        
        /** Local reused rewriter for dynamic rewrites during recompile */
 
@@ -1838,26 +1837,6 @@ public class Recompiler
                return (estFilesize < cpThreshold);
        }
        
-       public static boolean checkCPTransform(MRJobInstruction inst, 
MatrixObject[] inputs) 
-               throws DMLRuntimeException, IOException 
-       {
-               boolean ret = true;
-               
-               MatrixObject input = inputs[0]; // there can only be one input 
in TRANSFORM job
-               
-               Path path = new Path(input.getFileName());
-               long sizeOnHDFS = MapReduceTool.getFilesizeOnHDFS(path);
-               
-               // dimensions are not checked here, since the worst case 
dimensions 
-               // after transformations (with potential dummycoding) are 
typically unknown.
-               
-               if( sizeOnHDFS > CP_TRANSFORM_UNKNOWN_THRESHOLD_SIZE 
-                               || sizeOnHDFS*4 > 
OptimizerUtils.getLocalMemBudget() )
-                       ret = false;
-               LOG.info("checkCPTransform(): size = " + sizeOnHDFS + ", 
recompile to CP = " + ret);
-               return ret;
-       }
-       
        public static boolean checkCPDataGen( MRJobInstruction inst, String 
updatedRandInst ) 
                throws DMLRuntimeException 
        {

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java 
b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
index b406bb7..bec7b38 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
@@ -982,23 +982,6 @@ public class HopRewriteUtils
                return ret;
        }
        
-       public static boolean hasTransformParents( Hop hop )
-       {
-               boolean ret = false;
-               
-               ArrayList<Hop> parents = hop.getParent();
-               for( Hop p : parents )
-               {
-                       if(    p instanceof ParameterizedBuiltinOp 
-                               && 
((ParameterizedBuiltinOp)p).getOp()==ParamBuiltinOp.TRANSFORM) {
-                               ret = true;
-                       }
-               }
-                       
-                               
-               return ret;
-       }
-       
        public static boolean alwaysRequiresReblock(Hop hop)
        {
                return (    hop instanceof DataOp 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
index 2e2f91f..245a333 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
@@ -28,11 +28,8 @@ import org.apache.sysml.hops.DataOp;
 import org.apache.sysml.hops.FunctionOp;
 import org.apache.sysml.hops.Hop;
 import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.hops.Hop.DataOpTypes;
 import org.apache.sysml.hops.Hop.FileFormatTypes;
-import org.apache.sysml.hops.Hop.ParamBuiltinOp;
 import org.apache.sysml.hops.HopsException;
-import org.apache.sysml.hops.ParameterizedBuiltinOp;
 import org.apache.sysml.parser.Expression.DataType;
 
 /**
@@ -98,7 +95,7 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule
                        if( canReblock && 
                                ( (dop.getDataType() == DataType.MATRIX && 
(dop.getRowsInBlock() != blocksize || dop.getColsInBlock() != blocksize))
                                ||(dop.getDataType() == DataType.FRAME && 
OptimizerUtils.isSparkExecutionMode() && 
(dop.getInputFormatType()==FileFormatTypes.TEXT
-                                                 || 
dop.getInputFormatType()==FileFormatTypes.CSV && 
OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK))) ) 
+                                                 || 
dop.getInputFormatType()==FileFormatTypes.CSV))) ) 
                        {
                                if( dop.getDataOpType() == 
DataOp.DataOpTypes.PERSISTENTREAD) 
                                {
@@ -146,27 +143,6 @@ public class RewriteBlockSizeAndReblock extends 
HopRewriteRule
                                }
                        }
                } 
-               //TODO remove once transform rebased to frames
-               else if ( (hop instanceof ParameterizedBuiltinOp && 
((ParameterizedBuiltinOp)hop).getOp() == ParamBuiltinOp.TRANSFORM) ) {
-                       
-                       // check if there exists a non-csv-write output. If 
yes, add reblock
-                       boolean rblk = false;
-                       for(Hop out : hop.getParent()) 
-                       {
-                               if ( !(out instanceof DataOp 
-                                               && 
((DataOp)out).getDataOpType() == DataOpTypes.PERSISTENTWRITE 
-                                               && 
((DataOp)out).getInputFormatType() == FileFormatTypes.CSV) )
-                               {
-                                       rblk = true;
-                                       break;
-                               }
-                       }
-                       if ( rblk )
-                       {
-                               hop.setRequiresReblock(true);
-                               hop.setOutputBlocksizes(blocksize, blocksize);
-                       }
-               }
                else //NO DATAOP 
                {
                        // TODO: following two lines are commented, and the 
subsequent hack is used instead!

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
index 9aceab4..755c2e8 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteInjectSparkPReadCheckpointing.java
@@ -68,9 +68,8 @@ public class RewriteInjectSparkPReadCheckpointing extends 
HopRewriteRule
                        return;
                
                // The reblocking is performed after transform, and hence 
checkpoint only non-transformed reads.   
-               if(    (hop instanceof DataOp && 
((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD && 
!HopRewriteUtils.hasTransformParents(hop))
-                       || (hop.requiresReblock())
-                       )
+               if(    (hop instanceof DataOp && 
((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD)
+                       || hop.requiresReblock() )
                {
                        //make given hop for checkpointing (w/ default storage 
level)
                        //note: we do not recursively process childs here in 
order to prevent unnecessary checkpoints

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
index ee7b9c6..8396813 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteSplitDagUnknownCSVRead.java
@@ -146,8 +146,7 @@ public class RewriteSplitDagUnknownCSVRead extends 
StatementBlockRewriteRule
                        if(    dop.getDataOpType() == DataOpTypes.PERSISTENTREAD
                                && dop.getInputFormatType() == 
FileFormatTypes.CSV
                                && !dop.dimsKnown()
-                               && !HopRewriteUtils.hasOnlyWriteParents(dop, 
true, false)
-                               && !HopRewriteUtils.hasTransformParents(hop) )
+                               && !HopRewriteUtils.hasOnlyWriteParents(dop, 
true, false) )
                        {
                                cand.add(dop);
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/CSVReBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/CSVReBlock.java 
b/src/main/java/org/apache/sysml/lops/CSVReBlock.java
index a15e036..55571f7 100644
--- a/src/main/java/org/apache/sysml/lops/CSVReBlock.java
+++ b/src/main/java/org/apache/sysml/lops/CSVReBlock.java
@@ -21,12 +21,10 @@ package org.apache.sysml.lops;
 
 import org.apache.sysml.lops.LopProperties.ExecLocation;
 import org.apache.sysml.lops.LopProperties.ExecType;
-import org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes;
 import org.apache.sysml.lops.compile.JobType;
 import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression;
 
 
 /**
@@ -55,18 +53,7 @@ public class CSVReBlock extends Lop
                boolean breaksAlignment = false;
                boolean aligner = false;
                boolean definesMRJob = true;
-               
-               // If the input to reblock is a tranform, then piggyback it 
along with transform
-               if ( input instanceof ParameterizedBuiltin 
-                               && ((ParameterizedBuiltin)input).getOp() == 
OperationTypes.TRANSFORM ) 
-               {
-                       definesMRJob = false;
-                       lps.addCompatibility(JobType.TRANSFORM);
-               }
-               else 
-               {
-                       lps.addCompatibility(JobType.CSV_REBLOCK);
-               }
+               lps.addCompatibility(JobType.CSV_REBLOCK);
                
                if(et == ExecType.MR) {
                        this.lps.setProperties( inputs, ExecType.MR, 
ExecLocation.MapAndReduce, breaksAlignment, aligner, definesMRJob );
@@ -88,19 +75,16 @@ public class CSVReBlock extends Lop
        private String prepCSVProperties() throws LopsException {
                StringBuilder sb = new StringBuilder();
 
-               boolean isSparkTransformInput = false;
-               Data dataInput = null;
-               if(getInputs().get(0).getType() == Type.Data)
-                       dataInput = (Data)getInputs().get(0);
-               else if ( getInputs().get(0).getType() == 
Type.ParameterizedBuiltin && ((ParameterizedBuiltin)getInputs().get(0)).getOp() 
== OperationTypes.TRANSFORM) {
-                       isSparkTransformInput = (getExecType() == 
ExecType.SPARK);
-                       dataInput = (Data) 
((ParameterizedBuiltin)getInputs().get(0)).getNamedInput(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_DATA);
-               }
+               Data dataInput = (Data)getInputs().get(0);
                
-               Lop headerLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW, 
String.valueOf(DataExpression.DEFAULT_DELIM_HAS_HEADER_ROW));
-               Lop delimLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_DELIMITER, 
DataExpression.DEFAULT_DELIM_DELIMITER);
-               Lop fillLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_FILL, 
String.valueOf(DataExpression.DEFAULT_DELIM_FILL)); 
-               Lop fillValueLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_FILL_VALUE, 
String.valueOf(DataExpression.DEFAULT_DELIM_FILL_VALUE));
+               Lop headerLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_HAS_HEADER_ROW, 
+                       
String.valueOf(DataExpression.DEFAULT_DELIM_HAS_HEADER_ROW));
+               Lop delimLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_DELIMITER, 
+                       DataExpression.DEFAULT_DELIM_DELIMITER);
+               Lop fillLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_FILL, 
+                       String.valueOf(DataExpression.DEFAULT_DELIM_FILL)); 
+               Lop fillValueLop = 
dataInput.getNamedInputLop(DataExpression.DELIM_FILL_VALUE, 
+                       
String.valueOf(DataExpression.DEFAULT_DELIM_FILL_VALUE));
                
                if (headerLop.isVariable())
                        throw new LopsException(this.printErrorLocation()
@@ -119,10 +103,7 @@ public class CSVReBlock extends Lop
                                        + "Parameter " + 
DataExpression.DELIM_FILL_VALUE
                                        + " must be a literal.");
 
-               // Output from transform() does not have a header
-               // On MR, reblock is piggybacked along with transform, and 
hence 
-               // specific information about header needn't be passed through 
instruction
-               sb.append( ((Data)headerLop).getBooleanValue() && 
!isSparkTransformInput );
+               sb.append( ((Data)headerLop).getBooleanValue() );
                sb.append( OPERAND_DELIMITOR );
                sb.append( ((Data)delimLop).getStringValue() );
                sb.append( OPERAND_DELIMITOR );

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/Data.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Data.java 
b/src/main/java/org/apache/sysml/lops/Data.java
index b8e40af..3d5db8a 100644
--- a/src/main/java/org/apache/sysml/lops/Data.java
+++ b/src/main/java/org/apache/sysml/lops/Data.java
@@ -139,18 +139,9 @@ public class Data extends Lop
                
 
                if ( getFileFormatType() == FileFormatTypes.CSV ) {
-                       Lop input = getInputs().get(0);
-                       // If the input is data transform, then csv write can 
be piggybacked onto TRANSFORM job.
-                       // Otherwise, the input must be converted to csv format 
via WriteCSV MR job.
-                       if ( input instanceof ParameterizedBuiltin 
-                                       && 
((ParameterizedBuiltin)input).getOp() == 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM ) {
-                               lps.addCompatibility(JobType.TRANSFORM);
-                               definesMRJob = false;
-                       }
-                       else {
-                               lps.addCompatibility(JobType.CSV_WRITE);
-                               definesMRJob = true;
-                       }
+                       // The input must be converted to csv format via 
WriteCSV MR job.
+                       lps.addCompatibility(JobType.CSV_WRITE);
+                       definesMRJob = true;
                }
                else {
                        /*
@@ -477,16 +468,8 @@ public class Data extends Lop
                                
                                if ( this.getExecType() == ExecType.SPARK ) 
                                {
-                                       boolean isInputMatrixBlock = true;
-                                       Lop input = getInputs().get(0);
-                                       if ( input instanceof 
ParameterizedBuiltin 
-                                                       && 
((ParameterizedBuiltin)input).getOp() == 
ParameterizedBuiltin.OperationTypes.TRANSFORM ) {
-                                               // in the case of transform 
input, the input will be Text strings insteadof MatrixBlocks 
-                                               // This information is used to 
have correct class information while accessing RDDs from the symbol table 
-                                               isInputMatrixBlock = false;
-                                       }
                                        sb.append(OPERAND_DELIMITOR);
-                                       sb.append(isInputMatrixBlock);
+                                       sb.append(true); //isInputMatrixBlock
                                }
                        }
                        

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java 
b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
index fdaf3c5..fb48d2f 100644
--- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
+++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
@@ -28,7 +28,6 @@ import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.lops.compile.JobType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
-import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression;
 
 
 /**
@@ -40,7 +39,7 @@ public class ParameterizedBuiltin extends Lop
        
        public enum OperationTypes { 
                CDF, INVCDF, RMEMPTY, REPLACE, REXPAND,
-               TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA,
+               TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMMETA,
                TOSTRING
        };
        
@@ -99,11 +98,6 @@ public class ParameterizedBuiltin extends Lop
                        lps.addCompatibility(JobType.REBLOCK);
                        breaksAlignment=true;
                }
-               else if ( _operation == OperationTypes.TRANSFORM && et == 
ExecType.MR ) {
-                       definesMRJob = true;
-                       eloc = ExecLocation.MapAndReduce;
-                       lps.addCompatibility(JobType.TRANSFORM);
-               }
                else //executed in CP / CP_FILE / SPARK
                {
                        eloc = ExecLocation.ControlProgram;
@@ -212,8 +206,7 @@ public class ParameterizedBuiltin extends Lop
                                }
                                
                                break;
-                               
-                       case TRANSFORM: 
+                       
                        case TRANSFORMAPPLY:
                        case TRANSFORMDECODE:
                        case TRANSFORMMETA:     {
@@ -400,59 +393,6 @@ public class ParameterizedBuiltin extends Lop
                
                return sb.toString();
        }
-       
-       @Override 
-       public String getInstructions(int output_index) 
-               throws LopsException
-       {
-               StringBuilder sb = new StringBuilder();
-               sb.append( getExecType() );
-               sb.append( Lop.OPERAND_DELIMITOR );
-
-               if(_operation== OperationTypes.TRANSFORM) 
-               {
-                       sb.append( "transform" );
-                       sb.append( OPERAND_DELIMITOR );
-                       
-                       Lop iLop = 
_inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_DATA);
-                       
sb.append(iLop.prepInputOperand(getInputIndex("target")));
-                       sb.append( OPERAND_DELIMITOR );
-                       
-                       Lop iLop2 = 
_inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_MTD);
-                       sb.append(iLop2.prepScalarLabel());
-                       sb.append( OPERAND_DELIMITOR );
-                       
-                       // apply transform
-                       Lop iLop3a = 
_inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD);
-                       if ( iLop3a != null ) {
-                               sb.append("applymtd=");
-                               sb.append(iLop3a.prepScalarLabel());
-                               sb.append( OPERAND_DELIMITOR );
-                       }
-                       
-                       // transform specification (transform: mandatory, 
transformapply: optional)
-                       Lop iLop3b = 
_inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_SPEC);
-                       if ( iLop3b != null ) {
-                               sb.append("spec=");
-                               sb.append(iLop3b.prepScalarLabel());
-                               sb.append( OPERAND_DELIMITOR );
-                       }
-                       
-                       Lop iLop4 = 
_inputParams.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_OUTNAMES);
-                       if( iLop4 != null ) {
-                               sb.append("outnames=");
-                               sb.append(iLop4.prepScalarLabel());
-                               sb.append( OPERAND_DELIMITOR );
-                       }
-                       
-                       sb.append( prepOutputOperand(output_index));
-               }
-               else
-                       throw new LopsException(this.printErrorLocation() + "In 
ParameterizedBuiltin Lop, Unknown operation: " + _operation);
-       
-               return sb.toString();
-       }
-       
 
        @Override
        public String toString() {

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/Unary.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Unary.java 
b/src/main/java/org/apache/sysml/lops/Unary.java
index 087ffc4..cc53666 100644
--- a/src/main/java/org/apache/sysml/lops/Unary.java
+++ b/src/main/java/org/apache/sysml/lops/Unary.java
@@ -95,12 +95,11 @@ public class Unary extends Lop
                        lps.addCompatibility(JobType.ANY);
                        lps.removeNonPiggybackableJobs();
                        lps.removeCompatibility(JobType.CM_COV); // CM_COV 
allows only reducer instructions but this is MapOrReduce. TODO: piggybacking 
should be updated to take this extra constraint.
-                       lps.removeCompatibility(JobType.TRANSFORM);
-                       this.lps.setProperties(inputs, et, 
ExecLocation.MapOrReduce, breaksAlignment, aligner, definesMRJob);
+                       lps.setProperties(inputs, et, ExecLocation.MapOrReduce, 
breaksAlignment, aligner, definesMRJob);
                }
                else {
                        lps.addCompatibility(JobType.INVALID);
-                       this.lps.setProperties(inputs, et, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob);
+                       lps.setProperties(inputs, et, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob);
                }
        }
 
@@ -150,12 +149,11 @@ public class Unary extends Lop
                        lps.addCompatibility(JobType.ANY);
                        lps.removeNonPiggybackableJobs();
                        lps.removeCompatibility(JobType.CM_COV); // CM_COV 
allows only reducer instructions but this is MapOrReduce. TODO: piggybacking 
should be updated to take this extra constraint.
-                       lps.removeCompatibility(JobType.TRANSFORM);
-                       this.lps.setProperties(inputs, et, 
ExecLocation.MapOrReduce, breaksAlignment, aligner, definesMRJob);
+                       lps.setProperties(inputs, et, ExecLocation.MapOrReduce, 
breaksAlignment, aligner, definesMRJob);
                }
                else {
                        lps.addCompatibility(JobType.INVALID);
-                       this.lps.setProperties(inputs, et, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob);
+                       lps.setProperties(inputs, et, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java 
b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index 5816f3f..aab4303 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -54,14 +54,12 @@ import org.apache.sysml.lops.MapMult;
 import org.apache.sysml.lops.OutputParameters;
 import org.apache.sysml.lops.OutputParameters.Format;
 import org.apache.sysml.lops.PMMJ;
-import org.apache.sysml.lops.ParameterizedBuiltin;
 import org.apache.sysml.lops.PickByCount;
 import org.apache.sysml.lops.SortKeys;
 import org.apache.sysml.lops.Unary;
 import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression;
 import org.apache.sysml.parser.Expression.DataType;
-import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression;
 import org.apache.sysml.parser.StatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
@@ -2374,50 +2372,6 @@ public class Dag<N extends Lop>
                                
                                out.addLastInstruction(currInstr);
                        }
-                       else if(node instanceof ParameterizedBuiltin 
-                                       && ((ParameterizedBuiltin)node).getOp() 
== org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM) {
-                               
-                               ParameterizedBuiltin pbi = 
(ParameterizedBuiltin)node;
-                               Lop input = 
pbi.getNamedInput(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_DATA);
-                               if(input.getDataType()== DataType.FRAME) {
-                                       
-                                       // Output of transform is in CSV 
format, which gets subsequently reblocked 
-                                       // TODO: change it to output binaryblock
-                                       
-                                       Data dataInput = (Data) input;
-                                       
oparams.setFile_name(getNextUniqueFilename());
-                                       
oparams.setLabel(getNextUniqueVarname(DataType.MATRIX));
-
-                                       // generate an instruction that creates 
a symbol table entry for the new variable in CSV format
-                                       Data delimLop = (Data) 
dataInput.getNamedInputLop(
-                                                       
DataExpression.DELIM_DELIMITER, DataExpression.DEFAULT_DELIM_DELIMITER);
-                                       
-                                       Instruction createvarInst = 
VariableCPInstruction.prepareCreateVariableInstruction(
-                                               oparams.getLabel(), 
oparams.getFile_name(), true, 
-                                               DataType.MATRIX, 
OutputInfo.outputInfoToString(OutputInfo.CSVOutputInfo),
-                                                       new 
MatrixCharacteristics(oparams.getNumRows(), oparams.getNumCols(), -1, -1, 
oparams.getNnz()), oparams.getUpdateType(), 
-                                                       false, 
delimLop.getStringValue(), true
-                                               );
-                                       
-                                       createvarInst.setLocation(node);
-                                       
-                                       out.addPreInstruction(createvarInst);
-
-                                       // temp file as well as the variable 
has to be deleted at the end
-                                       Instruction currInstr = 
VariableCPInstruction.prepareRemoveInstruction(oparams.getLabel());
-                                       
-                                       currInstr.setLocation(node);
-                                       
-                                       out.addLastInstruction(currInstr);
-
-                                       // finally, add the generated filename 
and variable name to the list of outputs
-                                       out.setFileName(oparams.getFile_name());
-                                       out.setVarName(oparams.getLabel());
-                               }
-                               else {
-                                       throw new LopsException("Input to 
transform() has an invalid type: " + input.getDataType() + ", it must be 
FRAME.");
-                               }
-                       }
                        else if(!(node instanceof FunctionCallCP)) //general 
case
                        {
                                // generate temporary filename and a variable 
name to hold the
@@ -2913,7 +2867,7 @@ public class Dag<N extends Lop>
                 */
                
                // 
-               if ( jt != JobType.REBLOCK && jt != JobType.CSV_REBLOCK && jt 
!= JobType.DATAGEN && jt != JobType.TRANSFORM) {
+               if ( jt != JobType.REBLOCK && jt != JobType.CSV_REBLOCK && jt 
!= JobType.DATAGEN ) {
                        for (int i=0; i < inputInfos.size(); i++)
                                if ( inputInfos.get(i) == 
InputInfo.BinaryCellInputInfo || inputInfos.get(i) == 
InputInfo.TextCellInputInfo )
                                        cellModeOverride = true;
@@ -3129,9 +3083,7 @@ public class Dag<N extends Lop>
                }
 
                if (node.getExecLocation() == ExecLocation.Data ) {
-                       if ( ((Data)node).getFileFormatType() == 
FileFormatTypes.CSV 
-                                       && !(node.getInputs().get(0) instanceof 
ParameterizedBuiltin 
-                                                       && 
((ParameterizedBuiltin)node.getInputs().get(0)).getOp() == 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM)) {
+                       if ( ((Data)node).getFileFormatType() == 
FileFormatTypes.CSV ) {
                                // Generate write instruction, which goes into 
CSV_WRITE Job
                                int output_index = start_index[0];
                                
shuffleInstructions.add(node.getInstructions(inputIndices.get(0), 
output_index));
@@ -3171,12 +3123,6 @@ public class Dag<N extends Lop>
                                break;
                                
                        case ParameterizedBuiltin:
-                               if( ((ParameterizedBuiltin)node).getOp() == 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM ) {
-                                       
shuffleInstructions.add(node.getInstructions(output_index));
-                                       if(DMLScript.ENABLE_DEBUG_MODE) {
-                                               
MRJobLineNumbers.add(node._beginLine);
-                                       }
-                               }
                                break;
                                
                        /* Lop types that take two inputs */

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/compile/JobType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/JobType.java 
b/src/main/java/org/apache/sysml/lops/compile/JobType.java
index 0ca5b49..492be7e 100644
--- a/src/main/java/org/apache/sysml/lops/compile/JobType.java
+++ b/src/main/java/org/apache/sysml/lops/compile/JobType.java
@@ -22,7 +22,6 @@ package org.apache.sysml.lops.compile;
 import org.apache.sysml.hops.Hop.FileFormatTypes;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.lops.Data;
-import org.apache.sysml.lops.ParameterizedBuiltin;
 import org.apache.sysml.runtime.DMLRuntimeException;
 
 
@@ -70,8 +69,7 @@ public enum JobType
        DATA_PARTITION  (11, "DATAPARTITION",   false,                  false,  
                                                        true),
        CSV_REBLOCK             (12, "CSV_REBLOCK",     false,                  
false,                                                          false),
        CSV_WRITE               (13, "CSV_WRITE",               false,          
        false,                                                          true),
-       TRANSFORM               (14, "TRANSFORM",               false,          
        true,                                                           false),
-       GMRCELL                 (15, "GMRCELL",                 false,          
        false,                                                          false);
+       GMRCELL                 (14, "GMRCELL",                 false,          
        false,                                                          false);
 
 
        
@@ -140,8 +138,6 @@ public enum JobType
                                return Lop.Type.MMRJ;
                        else if ( getName().equals("SORT") )
                                return Lop.Type.SortKeys;
-                       else if ( getName().equals("TRANSFORM"))
-                               return Lop.Type.ParameterizedBuiltin;
                        else 
                                throw new DMLRuntimeException("Shuffle Lop Type 
is not defined for a job (" + getName() + ") that allows a single shuffle 
instruction.");
                }
@@ -178,19 +174,13 @@ public enum JobType
                
                case CSVReBlock:        return JobType.CSV_REBLOCK;
                
-               case ParameterizedBuiltin:              
-                               if( ((ParameterizedBuiltin)node).getOp() == 
ParameterizedBuiltin.OperationTypes.TRANSFORM )
-                                       return JobType.TRANSFORM;
-               
                case Data:
                        /*
                         * Only Write LOPs with external data formats (except 
MatrixMarket) produce MR Jobs
                         */
                        FileFormatTypes fmt = ((Data) node).getFileFormatType();
-                       if ( fmt == FileFormatTypes.CSV )
-                               return JobType.CSV_WRITE;
-                       else
-                               return null;
+                       return ( fmt == FileFormatTypes.CSV ) ?
+                               JobType.CSV_WRITE : null;
                        
                default:
                        return null; 
@@ -205,7 +195,7 @@ public enum JobType
                else {
                        if ( getName().equals("MMCJ") )
                                return false;
-                       else if ( getName().equals("MMRJ") || 
getName().equals("SORT")  || getName().equals("TRANSFORM"))
+                       else if ( getName().equals("MMRJ") || 
getName().equals("SORT"))
                                return true;
                        else 
                                throw new DMLRuntimeException("Implementation 
for isCompatibleWithParentNodes() is missing for a job (" + getName() + ") that 
allows a single shuffle instruction.");

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java 
b/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java
index 71de7c8..5cd0f60 100644
--- a/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java
+++ b/src/main/java/org/apache/sysml/lops/runtime/RunMRJobs.java
@@ -22,7 +22,6 @@ package org.apache.sysml.lops.runtime;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.wink.json4j.JSONException;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
@@ -69,7 +68,6 @@ import org.apache.sysml.runtime.matrix.data.LibMatrixDatagen;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator;
-import org.apache.sysml.runtime.transform.DataTransform;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.utils.Statistics;
 
@@ -305,25 +303,7 @@ public class RunMRJobs
                        case DATA_PARTITION:
                                ret = DataPartitionMR.runJob(inst, 
inputMatrices, shuffleInst, inst.getIv_resultIndices(), outputMatrices, 
inst.getIv_numReducers(), inst.getIv_replication());
                                break;
-                               
-                       case TRANSFORM:
-                               
-                               if(    
ConfigurationManager.isDynamicRecompilation()
-                                               && 
OptimizerUtils.ALLOW_TRANSFORM_RECOMPILE
-                                               && DMLScript.rtplatform != 
RUNTIME_PLATFORM.HADOOP 
-                                               && Recompiler.checkCPTransform( 
inst, inputMatrices ) ) 
-                                       {
-                                               // transform the data and 
generate output in CSV format
-                                               ret = 
executeInMemoryTransform(inst, inputMatrices, outputMatrices);
-                                               
Statistics.decrementNoOfExecutedMRJobs();
-                                               execCP = true;
-                                       }
-                                       else 
-                                       {
-                                               ret = 
DataTransform.mrDataTransform(inst, inputMatrices, shuffleInst, otherInst, 
inst.getIv_resultIndices(), outputMatrices, inst.getIv_numReducers(), 
inst.getIv_replication());
-                                       }
-                               break;
-                               
+                       
                        default:
                                throw new DMLRuntimeException("Invalid jobtype: 
" + inst.getJobType());
                        }
@@ -360,11 +340,12 @@ public class RunMRJobs
                                                
                                                
outputMatrices[i].setHDFSFileExists(true);
                                                
-                                               if ( inst.getJobType() != 
JobType.CSV_WRITE && inst.getJobType() != JobType.TRANSFORM) {
+                                               if ( inst.getJobType() != 
JobType.CSV_WRITE ) {
                                                        // write out metadata 
file
                                                        // Currently, valueType 
information in not stored in MR instruction, 
                                                        // since only DOUBLE 
matrices are supported ==> hard coded the value type information for now
-                                                       
MapReduceTool.writeMetaDataFile(fname + ".mtd", ValueType.DOUBLE,  
((MatrixDimensionsMetaData)ret.getMetaData(i)).getMatrixCharacteristics(), 
outinfo);
+                                                       
MapReduceTool.writeMetaDataFile(fname + ".mtd", ValueType.DOUBLE,  
+                                                               
((MatrixDimensionsMetaData)ret.getMetaData(i)).getMatrixCharacteristics(), 
outinfo);
                                                }
                                        }
                                }
@@ -558,11 +539,4 @@ public class RunMRJobs
                
                return  new JobReturn( mc, inst.getOutputInfos(), true);
        }
-       
-       private static JobReturn executeInMemoryTransform( MRJobInstruction 
inst, MatrixObject[] inputMatrices, MatrixObject[] outputMatrices) throws 
IOException, DMLRuntimeException, IllegalArgumentException, JSONException {
-               return DataTransform.cpDataTransform(
-                               inst.getIv_shuffleInstructions(), 
-                               inputMatrices, 
-                               outputMatrices);
-       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index a40c736..75f11b5 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -1920,13 +1920,6 @@ public class DMLTranslator
                        currBuiltinOp = new ReorgOp(target.getName(), 
target.getDataType(), target.getValueType(), ReOrgOp.SORT, inputs);
                        
                        break;
-                       
-               case TRANSFORM:
-                       currBuiltinOp = new ParameterizedBuiltinOp(
-                                                                       
target.getName(), target.getDataType(), 
-                                                                       
target.getValueType(), ParamBuiltinOp.TRANSFORM, 
-                                                                       
paramHops);
-                       break;  
                
                case TRANSFORMAPPLY:
                        currBuiltinOp = new ParameterizedBuiltinOp(

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/parser/Expression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/Expression.java 
b/src/main/java/org/apache/sysml/parser/Expression.java
index 01a0a77..9ee3fba 100644
--- a/src/main/java/org/apache/sysml/parser/Expression.java
+++ b/src/main/java/org/apache/sysml/parser/Expression.java
@@ -139,7 +139,7 @@ public abstract class Expression
                GROUPEDAGG, RMEMPTY, REPLACE, ORDER, 
                // Distribution Functions
                CDF, INVCDF, PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, 
PEXP, QEXP,
-               TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMENCODE, 
TRANSFORMMETA,
+               TRANSFORMAPPLY, TRANSFORMDECODE, TRANSFORMENCODE, TRANSFORMMETA,
                TOSTRING,       // The "toString" method for DML; named 
arguments accepted to format output
                INVALID
        };

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
 
b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
index aa888d3..4a3da28 100644
--- 
a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
+++ 
b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 
 import org.apache.sysml.hops.Hop.ParamBuiltinOp;
-import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.parser.LanguageException.LanguageErrorCodes;
 
 
@@ -39,8 +38,6 @@ public class ParameterizedBuiltinFunctionExpression extends 
DataIdentifier
        public static final String TF_FN_PARAM_MTD2 = "meta";
        public static final String TF_FN_PARAM_SPEC = "spec";
        public static final String TF_FN_PARAM_MTD = "transformPath"; //NOTE 
MB: for backwards compatibility
-       public static final String TF_FN_PARAM_APPLYMTD = "applyTransformPath";
-       public static final String TF_FN_PARAM_OUTNAMES = "outputNames";
        
        private static HashMap<String, 
Expression.ParameterizedBuiltinFunctionOp> opcodeMap;
        static {
@@ -67,7 +64,6 @@ public class ParameterizedBuiltinFunctionExpression extends 
DataIdentifier
                opcodeMap.put("qexp",   
Expression.ParameterizedBuiltinFunctionOp.QEXP);
 
                // data transformation functions
-               opcodeMap.put("transform",      
Expression.ParameterizedBuiltinFunctionOp.TRANSFORM);
                opcodeMap.put("transformapply", 
Expression.ParameterizedBuiltinFunctionOp.TRANSFORMAPPLY);
                opcodeMap.put("transformdecode", 
Expression.ParameterizedBuiltinFunctionOp.TRANSFORMDECODE);
                opcodeMap.put("transformencode", 
Expression.ParameterizedBuiltinFunctionOp.TRANSFORMENCODE);
@@ -222,15 +218,11 @@ public class ParameterizedBuiltinFunctionExpression 
extends DataIdentifier
                case ORDER:
                        validateOrder(output, conditional);
                        break;
-
-               case TRANSFORM:
-                       validateTransform(output, conditional);
-                       break;
                
                case TRANSFORMAPPLY:
                        validateTransformApply(output, conditional);
                        break;
-                       
+               
                case TRANSFORMDECODE:
                        validateTransformDecode(output, conditional);
                        break;  
@@ -289,64 +281,6 @@ public class ParameterizedBuiltinFunctionExpression 
extends DataIdentifier
                return;
        }
        
-       // example: A = transform(data=D, txmtd="", txspec="")
-       private void validateTransform(DataIdentifier output, boolean 
conditional) 
-               throws LanguageException 
-       {
-               //validate data
-               checkDataType("transform", TF_FN_PARAM_DATA, DataType.FRAME, 
conditional);
-               
-               Expression txmtd = getVarParam(TF_FN_PARAM_MTD);
-               if( txmtd==null ) {
-                       raiseValidateError("Named parameter '" + 
TF_FN_PARAM_MTD + "' missing. Please specify the transformation metadata file 
path.", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-               }
-               else if( txmtd.getOutput().getDataType() != DataType.SCALAR || 
txmtd.getOutput().getValueType() != ValueType.STRING ){                         
 
-                       raiseValidateError("Transformation metadata file '" + 
TF_FN_PARAM_MTD + "' must be a string value (a scalar).", conditional, 
LanguageErrorCodes.INVALID_PARAMETERS);
-               }
-               
-               Expression txspec = getVarParam(TF_FN_PARAM_SPEC);
-               Expression applyMTD = getVarParam(TF_FN_PARAM_APPLYMTD);
-               if( txspec==null ) {
-                       if ( applyMTD == null )
-                               raiseValidateError("Named parameter '" + 
TF_FN_PARAM_SPEC + "' missing. Please specify the transformation specification 
(JSON string).", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-               }
-               else if( txspec.getOutput().getDataType() != DataType.SCALAR  
|| txspec.getOutput().getValueType() != ValueType.STRING ){       
-                       raiseValidateError("Transformation specification '" + 
TF_FN_PARAM_SPEC + "' must be a string value (a scalar).", conditional, 
LanguageErrorCodes.INVALID_PARAMETERS);
-               }       
-               
-               if ( applyMTD != null ) {
-                       if( applyMTD.getOutput().getDataType() != 
DataType.SCALAR  || applyMTD.getOutput().getValueType() != ValueType.STRING ){  
      
-                               raiseValidateError("Apply transformation 
metadata file'" + TF_FN_PARAM_APPLYMTD + "' must be a string value (a 
scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-                       }
-                       
-                       //NOTE: txspec can still be optionally specified; if 
specified it takes precedence over 
-                       // specification persisted in txmtd during transform.
-               }
-               
-               Expression outNames = getVarParam(TF_FN_PARAM_OUTNAMES);
-               if ( outNames != null ) {
-                       if( outNames.getOutput().getDataType() != 
DataType.SCALAR || outNames.getOutput().getValueType() != ValueType.STRING )    
                      
-                               raiseValidateError("The parameter specifying 
column names in the output file '" + TF_FN_PARAM_MTD + "' must be a string 
value (a scalar).", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-                       if ( applyMTD != null)
-                               raiseValidateError("Only one of '" + 
TF_FN_PARAM_APPLYMTD + "' or '" + TF_FN_PARAM_OUTNAMES + "' can be specified in 
transform().", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
-               }
-               
-               // disable frame csv reblocks as transform operates directly 
over csv files
-               // (this is required to support both file-based transform and 
frame-based
-               // transform at the same time; hence, transform and frame-based 
transform
-               // functions over csv cannot be used in the same script; 
accordingly we
-               // give an appropriate warning)
-               OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = false;
-               raiseValidateError("Disable frame csv reblock to support 
file-based transform.", true);
-               
-               // Output is a matrix with same dims as input
-               output.setDataType(DataType.MATRIX);
-               output.setFormatType(FormatType.CSV);
-               output.setValueType(ValueType.DOUBLE);
-               // Output dimensions may not be known at compile time, for 
example when dummycoding.
-               output.setDimensions(-1, -1);
-       }
-       
        // example: A = transformapply(target=X, meta=M, spec=s)
        private void validateTransformApply(DataIdentifier output, boolean 
conditional) 
                throws LanguageException 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
 
b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
index eb69068..8635833 100644
--- 
a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
+++ 
b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
@@ -46,7 +46,7 @@ public class ParameterizedBuiltin extends ValueFunction
        
        public enum ParameterizedBuiltinCode { 
                CDF, INVCDF, RMEMPTY, REPLACE, REXPAND,
-               TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE };
+               TRANSFORMAPPLY, TRANSFORMDECODE };
        public enum ProbabilityDistributionCode { 
                INVALID, NORMAL, EXP, CHISQ, F, T };
        
@@ -62,7 +62,6 @@ public class ParameterizedBuiltin extends ValueFunction
                String2ParameterizedBuiltinCode.put( "rmempty", 
ParameterizedBuiltinCode.RMEMPTY);
                String2ParameterizedBuiltinCode.put( "replace", 
ParameterizedBuiltinCode.REPLACE);
                String2ParameterizedBuiltinCode.put( "rexpand", 
ParameterizedBuiltinCode.REXPAND);
-               String2ParameterizedBuiltinCode.put( "transform", 
ParameterizedBuiltinCode.TRANSFORM);
                String2ParameterizedBuiltinCode.put( "transformapply", 
ParameterizedBuiltinCode.TRANSFORMAPPLY);
                String2ParameterizedBuiltinCode.put( "transformdecode", 
ParameterizedBuiltinCode.TRANSFORMDECODE);
        }
@@ -169,9 +168,6 @@ public class ParameterizedBuiltin extends ValueFunction
                        case REXPAND:
                                return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.REXPAND);
                        
-                       case TRANSFORM:
-                               return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORM);
-                       
                        case TRANSFORMAPPLY:
                                return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMAPPLY);
                        

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 52c11e6..9126ab7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -191,7 +191,6 @@ public class CPInstructionParser extends InstructionParser
                String2CPInstructionType.put( "rmempty"     , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( "replace"     , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( "rexpand"     , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
-               String2CPInstructionType.put( "transform"       , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( 
"transformapply",CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( 
"transformdecode",CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( 
"transformencode",CPINSTRUCTION_TYPE.MultiReturnParameterizedBuiltin);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java
index 0b9cb7d..5ced2ab 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/MRInstructionParser.java
@@ -288,7 +288,6 @@ public class MRInstructionParser extends InstructionParser
                //dummy (pseudo instructions)
                String2MRInstructionType.put( "sort", MRINSTRUCTION_TYPE.Sort);
                String2MRInstructionType.put( "csvwrite", 
MRINSTRUCTION_TYPE.CSVWrite);
-               String2MRInstructionType.put( "transform", 
MRINSTRUCTION_TYPE.Transform);
                
                //parameterized builtins
                String2MRInstructionType.put( "replace", 
MRINSTRUCTION_TYPE.ParameterizedBuiltin);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 5ca3847..a2b0ef1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -230,7 +230,6 @@ public class SPInstructionParser extends InstructionParser
                String2SPInstructionType.put( "rmempty"      , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( "replace"      , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( "rexpand"      , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
-               String2SPInstructionType.put( "transform"    , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( 
"transformapply",SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( 
"transformdecode",SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( 
"transformencode",SPINSTRUCTION_TYPE.MultiReturnBuiltin);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index a31fe94..847d5f9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -33,12 +33,10 @@ import 
org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
-import org.apache.sysml.runtime.matrix.JobReturn;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
-import org.apache.sysml.runtime.transform.DataTransform;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.transform.decode.Decoder;
 import org.apache.sysml.runtime.transform.decode.DecoderFactory;
@@ -141,8 +139,7 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
                        return new ParameterizedBuiltinCPInstruction(new 
SimpleOperator(func), paramsMap, out, opcode, str);
                }
-               else if (   opcode.equals("transform")
-                                || opcode.equals("transformapply")
+               else if (   opcode.equals("transformapply")
                                 || opcode.equals("transformdecode")
                                 || opcode.equals("transformmeta")) 
                {
@@ -255,16 +252,6 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        ec.setMatrixOutput(output.getName(), ret);
                        ec.releaseMatrixInput(params.get("target"));
                }
-               else if ( opcode.equalsIgnoreCase("transform")) {
-                       FrameObject fo = 
ec.getFrameObject(params.get("target"));
-                       MatrixObject out = 
ec.getMatrixObject(output.getName());                        
-                       try {
-                               JobReturn jt = 
DataTransform.cpDataTransform(this, new FrameObject[] { fo } , new 
MatrixObject[] {out} );
-                               
out.updateMatrixCharacteristics(jt.getMatrixCharacteristics(0));
-                       } catch (Exception e) {
-                               throw new DMLRuntimeException(e);
-                       }
-               }
                else if ( opcode.equalsIgnoreCase("transformapply")) {
                        //acquire locks
                        FrameBlock data = 
ec.getFrameInput(params.get("target"));

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java 
b/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java
index 68132fc..ad98ad7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/MRInstruction.java
@@ -32,7 +32,7 @@ public abstract class MRInstruction extends Instruction
 {
        
        public enum MRINSTRUCTION_TYPE { INVALID, Append, Aggregate, 
ArithmeticBinary, ArithmeticBinary2, AggregateBinary, AggregateUnary, 
-               Rand, Seq, CSVReblock, CSVWrite, Transform,
+               Rand, Seq, CSVReblock, CSVWrite, 
                Reblock, Reorg, Replicate, Unary, CombineBinary, CombineUnary, 
CombineTernary, PickByCount, Partition,
                Ternary, Quaternary, CM_N_COV, MapGroupedAggregate, 
GroupedAggregate, RangeReIndex, ZeroOut, MMTSJ, PMMJ, MatrixReshape, 
ParameterizedBuiltin, Sort, MapMultChain,
                CumsumAggregate, CumsumSplit, CumsumOffset, BinUaggChain, 
UaggOuterChain, RemoveEmpty}; 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 5890bf9..254c3d7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -56,12 +56,12 @@ import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.transform.MVImputeAgent;
-import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
-import org.apache.sysml.runtime.transform.RecodeAgent;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.transform.encode.EncoderComposite;
 import org.apache.sysml.runtime.transform.encode.EncoderFactory;
+import org.apache.sysml.runtime.transform.encode.EncoderMVImpute;
+import org.apache.sysml.runtime.transform.encode.EncoderRecode;
+import org.apache.sysml.runtime.transform.encode.EncoderMVImpute.MVMethod;
 import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.transform.meta.TfOffsetMap;
 
@@ -129,7 +129,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                        .distinct().groupByKey()
                                        .flatMap(new 
TransformEncodeGroupFunction(accMax));
                        if( containsMVImputeEncoder(encoderBuild) ) {
-                               MVImputeAgent mva = 
getMVImputeEncoder(encoderBuild);
+                               EncoderMVImpute mva = 
getMVImputeEncoder(encoderBuild);
                                rcMaps = rcMaps.union(
                                                in.mapPartitionsToPair(new 
TransformEncodeBuild2Function(mva))
                                                  .groupByKey().flatMap(new 
TransformEncodeGroup2Function(mva)) );
@@ -149,7 +149,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                omap = new 
TfOffsetMap(SparkUtils.toIndexedLong(in.mapToPair(
                                        new 
RDDTransformApplyOffsetFunction(spec, colnames)).collect()));
                        }
-                               
+                       
                        //create encoder broadcast (avoiding replication per 
task) 
                        Encoder encoder = EncoderFactory.createEncoder(spec, 
colnames,
                                        fo.getSchema(), 
(int)fo.getNumColumns(), meta);
@@ -176,16 +176,16 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
        private boolean containsMVImputeEncoder(Encoder encoder) {
                if( encoder instanceof EncoderComposite )
                        for( Encoder cencoder : 
((EncoderComposite)encoder).getEncoders() )
-                               if( cencoder instanceof MVImputeAgent )
+                               if( cencoder instanceof EncoderMVImpute )
                                        return true;
                return false;   
        }
 
-       private MVImputeAgent getMVImputeEncoder(Encoder encoder) {
+       private EncoderMVImpute getMVImputeEncoder(Encoder encoder) {
                if( encoder instanceof EncoderComposite )
                        for( Encoder cencoder : 
((EncoderComposite)encoder).getEncoders() )
-                               if( cencoder instanceof MVImputeAgent )
-                                       return (MVImputeAgent) cencoder;
+                               if( cencoder instanceof EncoderMVImpute )
+                                       return (EncoderMVImpute) cencoder;
                return null;    
        }
        
@@ -248,12 +248,12 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
        {
                private static final long serialVersionUID = 
6336375833412029279L;
 
-               private RecodeAgent _raEncoder = null;
+               private EncoderRecode _raEncoder = null;
                
                public TransformEncodeBuildFunction(Encoder encoder) {
                        for( Encoder cEncoder : 
((EncoderComposite)encoder).getEncoders() )
-                               if( cEncoder instanceof RecodeAgent )
-                                       _raEncoder = (RecodeAgent)cEncoder;
+                               if( cEncoder instanceof EncoderRecode )
+                                       _raEncoder = (EncoderRecode)cEncoder;
                }
                
                @Override
@@ -310,7 +310,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                sb.append(' ');
                                sb.append(colID);
                                sb.append(' ');
-                               sb.append(RecodeAgent.constructRecodeMapEntry(
+                               sb.append(EncoderRecode.constructRecodeMapEntry(
                                                iter.next().toString(), rowID));
                                ret.add(sb.toString());
                                sb.setLength(0); 
@@ -326,9 +326,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
        {
                private static final long serialVersionUID = 
6336375833412029279L;
 
-               private MVImputeAgent _encoder = null;
+               private EncoderMVImpute _encoder = null;
                
-               public TransformEncodeBuild2Function(MVImputeAgent encoder) {
+               public TransformEncodeBuild2Function(EncoderMVImpute encoder) {
                        _encoder = encoder;
                }
                
@@ -370,9 +370,9 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
        {
                private static final long serialVersionUID = 
702100641492347459L;
                
-               private MVImputeAgent _encoder = null;
+               private EncoderMVImpute _encoder = null;
                
-               public TransformEncodeGroup2Function(MVImputeAgent encoder) {   
+               public TransformEncodeGroup2Function(EncoderMVImpute encoder) { 
                        _encoder = encoder;
                }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 2a80cfc..179ef9e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -38,7 +38,6 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.parser.Statement;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -73,7 +72,6 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
-import org.apache.sysml.runtime.transform.DataTransform;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.transform.decode.Decoder;
 import org.apache.sysml.runtime.transform.decode.DecoderFactory;
@@ -173,7 +171,6 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        }
                        else if(   opcode.equalsIgnoreCase("rexpand") 
                                        || opcode.equalsIgnoreCase("replace")
-                                       || opcode.equalsIgnoreCase("transform")
                                        || 
opcode.equalsIgnoreCase("transformapply")
                                        || 
opcode.equalsIgnoreCase("transformdecode")) 
                        {
@@ -432,17 +429,6 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                        mcOut.set(dirRows?lmaxVal:mcIn.getRows(), 
dirRows?mcIn.getRows():lmaxVal, (int)brlen, (int)bclen, -1);
                }
-               else if ( opcode.equalsIgnoreCase("transform") ) 
-               {
-                       // perform data transform on Spark
-                       try {
-                               DataTransform.spDataTransform( this, 
-                                               new FrameObject[] { 
sec.getFrameObject(params.get("target")) }, 
-                                               new MatrixObject[] { 
sec.getMatrixObject(output.getName()) }, ec);
-                       } catch (Exception e) {
-                               throw new DMLRuntimeException(e);
-                       }
-               }
                else if ( opcode.equalsIgnoreCase("transformapply") ) 
                {
                        //get input RDD and meta data

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index c30c85f..8bfee87 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -56,10 +56,6 @@ public class WriteSPInstruction extends SPInstruction
        private CPOperand input3 = null;
        private CPOperand input4 = null;
        private FileFormatProperties formatProperties;
-       
-       //scalars might occur for transform
-       // TODO remove once transform over frames supported
-       private boolean isInputMatrixBlock = true; 
 
        public WriteSPInstruction(CPOperand in1, CPOperand in2, CPOperand in3, 
String opcode, String str) {
                super(opcode, str);
@@ -100,10 +96,6 @@ public class WriteSPInstruction extends SPInstruction
                        boolean sparse = Boolean.parseBoolean(parts[6]);
                        FileFormatProperties formatProperties = new 
CSVFileFormatProperties(hasHeader, delim, sparse);
                        inst.setFormatProperties(formatProperties);
-                       
-                       boolean isInputMB = Boolean.parseBoolean(parts[7]);
-                       inst.setInputMatrixBlock(isInputMB);
-
                        CPOperand in4 = new CPOperand(parts[8]);
                        inst.input4 = in4;
                } else {
@@ -125,14 +117,6 @@ public class WriteSPInstruction extends SPInstruction
                formatProperties = prop;
        }
        
-       public void setInputMatrixBlock(boolean isMB) {
-               isInputMatrixBlock = isMB;
-       }
-       
-       public boolean isInputMatrixBlock() {
-               return isInputMatrixBlock;
-       }
-       
        @Override
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 
@@ -180,7 +164,7 @@ public class WriteSPInstruction extends SPInstruction
                {
                        //piggyback nnz maintenance on write
                        LongAccumulator aNnz = null;
-                       if ( isInputMatrixBlock && !mc.nnzKnown() ) {
+                       if( !mc.nnzKnown() ) {
                                aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
                                in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
                        }
@@ -202,57 +186,25 @@ public class WriteSPInstruction extends SPInstruction
                        else
                                customSaveTextFile(ijv, fname, false);
                        
-                       if ( isInputMatrixBlock && !mc.nnzKnown() )
+                       if( !mc.nnzKnown() )
                                mc.setNonZeros( aNnz.value() );
                }
                else if( oi == OutputInfo.CSVOutputInfo ) 
                {
-                       JavaRDD<String> out = null;
                        LongAccumulator aNnz = null;
                        
-                       if ( isInputMatrixBlock ) {
-                               //piggyback nnz computation on actual write
-                               if( !mc.nnzKnown() ) {
-                                       aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
-                                       in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
-                               }       
-                               
-                               out = RDDConverterUtils.binaryBlockToCsv(in1, 
mc, 
-                                               (CSVFileFormatProperties) 
formatProperties, true);
-                       }
-                       else 
-                       {
-                               // This case is applicable when the CSV output 
from transform() is written out
-                               // TODO remove once transform over frames 
supported
-                               @SuppressWarnings("unchecked")
-                               JavaPairRDD<Long,String> rdd = 
(JavaPairRDD<Long, String>) 
(sec.getMatrixObject(input1.getName())).getRDDHandle().getRDD();
-                               out = rdd.values(); 
-
-                               String sep = ",";
-                               boolean hasHeader = false;
-                               if(formatProperties != null) {
-                                       sep = ((CSVFileFormatProperties) 
formatProperties).getDelim();
-                                       hasHeader = ((CSVFileFormatProperties) 
formatProperties).hasHeader();
-                               }
-                               
-                               if(hasHeader) {
-                                       StringBuffer buf = new StringBuffer();
-                               for(int j = 1; j < mc.getCols(); j++) {
-                                       if(j != 1) {
-                                               buf.append(sep);
-                                       }
-                                       buf.append("C" + j);
-                               }
-                               ArrayList<String> headerContainer = new 
ArrayList<String>(1);
-                               headerContainer.add(0, buf.toString());
-                               JavaRDD<String> header = 
sec.getSparkContext().parallelize(headerContainer);
-                               out = header.union(out);
-                               }
-                       }
+                       //piggyback nnz computation on actual write
+                       if( !mc.nnzKnown() ) {
+                               aNnz = 
sec.getSparkContext().sc().longAccumulator("nnz");
+                               in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
+                       }       
                        
+                       JavaRDD<String> out = 
RDDConverterUtils.binaryBlockToCsv(
+                               in1, mc, (CSVFileFormatProperties) 
formatProperties, true);
+
                        customSaveTextFile(out, fname, false);
                        
-                       if( isInputMatrixBlock && !mc.nnzKnown() )
+                       if( !mc.nnzKnown() )
                                mc.setNonZeros((long)aNnz.value().longValue());
                }
                else if( oi == OutputInfo.BinaryBlockOutputInfo ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
index b976545..3bd9758 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CSVReblockMR.java
@@ -55,7 +55,6 @@ import 
org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
 import 
org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
-import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 
@@ -281,17 +280,9 @@ public class CSVReblockMR
                                otherInstructionsInReducer, numReducers, 
replication, resultIndexes, outputs, outputInfos, ret1.counterFile, 
smallestFiles);
                return ret;
        }
-       
-       public static AssignRowIDMRReturn runAssignRowIDMRJob(String[] inputs, 
InputInfo[] inputInfos, int[] brlens, int[] bclens, 
-                       String reblockInstructions, int replication, String[] 
smallestFiles) 
-       throws Exception
-       {
-               return runAssignRowIDMRJob(inputs, inputInfos, brlens, bclens, 
reblockInstructions, replication, smallestFiles, false, null, null);
-       }
-
                
        public static AssignRowIDMRReturn runAssignRowIDMRJob(String[] inputs, 
InputInfo[] inputInfos, int[] brlens, int[] bclens, 
-                       String reblockInstructions, int replication, String[] 
smallestFiles, boolean transform, String naStrings, String spec) 
+                       String reblockInstructions, int replication, String[] 
smallestFiles) 
        throws Exception
        {
                AssignRowIDMRReturn ret=new AssignRowIDMRReturn();
@@ -347,16 +338,6 @@ public class CSVReblockMR
                job.setOutputKeyClass(ByteWritable.class);
                job.setOutputValueClass(OffsetCount.class);
                
-               // setup properties relevant to transform
-               job.setBoolean(MRJobConfiguration.TF_TRANSFORM, transform);
-               if (transform)
-               {
-                       if ( naStrings != null)
-                               // Adding "dummy" string to handle the case of 
na_strings = ""
-                               job.set(MRJobConfiguration.TF_NA_STRINGS, 
TfUtils.prepNAStrings(naStrings) );
-                       job.set(MRJobConfiguration.TF_SPEC, spec);
-               }
-               
                RunningJob runjob=JobClient.runJob(job);
                
                /* Process different counters */

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
index 85b55cb..4f9877b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
@@ -36,7 +36,6 @@ import 
org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.CSVReblockMR;
 import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
-import org.apache.sysml.runtime.transform.TfUtils;
 
 public class CSVAssignRowIDMapper extends MapReduceBase implements 
Mapper<LongWritable, Text, ByteWritable, OffsetCount>
 {      
@@ -51,9 +50,6 @@ public class CSVAssignRowIDMapper extends MapReduceBase 
implements Mapper<LongWr
        private String filename = "";
        private boolean headerFile = false;
        
-       // members relevant to transform
-       private TfUtils _agents = null;
-       
        @Override
        public void map(LongWritable key, Text value,
                        OutputCollector<ByteWritable, OffsetCount> out, 
Reporter report)
@@ -69,7 +65,7 @@ public class CSVAssignRowIDMapper extends MapReduceBase 
implements Mapper<LongWr
                if(key.get()==0 && headerFile) {
                        if(!ignoreFirstLine) {
                                
report.incrCounter(CSVReblockMR.NUM_COLS_IN_MATRIX, outKey.toString(), 
value.toString().split(delim, -1).length);
-                               num += omit(value.toString()) ? 0 : 1;
+                               num++;
                        }
                        else
                                realFirstLine = true;
@@ -79,7 +75,7 @@ public class CSVAssignRowIDMapper extends MapReduceBase 
implements Mapper<LongWr
                                
report.incrCounter(CSVReblockMR.NUM_COLS_IN_MATRIX, outKey.toString(), 
value.toString().split(delim, -1).length);
                                realFirstLine = false;
                        }
-                       num += omit(value.toString()) ? 0 : 1;
+                       num++;
                }
        }
        
@@ -107,23 +103,12 @@ public class CSVAssignRowIDMapper extends MapReduceBase 
implements Mapper<LongWr
                                        ignoreFirstLine = ins.hasHeader;
                                        break;
                                }
-               
-                       // load properties relevant to transform
-                       boolean omit = 
job.getBoolean(MRJobConfiguration.TF_TRANSFORM, false);
-                       if ( omit ) 
-                               _agents = new TfUtils(job, true);
                }
                catch(Exception e) {
                        throw new RuntimeException(e);
                }
        }
        
-       private boolean omit(String line) {
-               if(_agents == null)
-                       return false;           
-               return _agents.omit( line.split(delim, -1) );
-       }
-       
        @Override
        public void close() throws IOException {
                if( outCache != null ) //robustness empty splits

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index 50a7412..88512b6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -231,33 +231,6 @@ public class MRJobConfiguration
         */
        public static final String NUM_NONZERO_CELLS="nonzeros";
 
-       public static final String TF_NUM_COLS          = 
"transform.num.columns";
-       public static final String TF_HAS_HEADER        = 
"transform.has.header";
-       public static final String TF_DELIM             = 
"transform.field.delimiter";
-       public static final String TF_NA_STRINGS        = 
"transform.na.strings";
-       public static final String TF_HEADER            = 
"transform.header.line";
-       public static final String TF_SPEC              = 
"transform.specification";
-       public static final String TF_TMP_LOC           = 
"transform.temp.location";
-       public static final String TF_TRANSFORM     = "transform.omit.na.rows";
-       
-       public static final String TF_SMALLEST_FILE= "transform.smallest.file";
-       public static final String TF_OFFSETS_FILE = "transform.offsets.file";
-       public static final String TF_TXMTD_PATH   = "transform.txmtd.path";
-       
-       /*public static enum DataTransformJobProperty 
-       {
-               RCD_NUM_COLS("recode.num.columns");
-               
-               private final String name;
-               private DataTransformJobProperty(String n) {
-                       name = n;
-               }
-       }*/
-       
-       public static enum DataTransformCounters { 
-               TRANSFORMED_NUM_ROWS
-       };
-       
        public static final int getMiscMemRequired(JobConf job)
        {
                return job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 
4096);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0cd3905f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
deleted file mode 100644
index e932d1e..0000000
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMR.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.transform;
-import java.util.HashSet;
-
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Counters.Group;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.instructions.InstructionParser;
-import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
-import org.apache.sysml.runtime.matrix.CSVReblockMR.BlockRow;
-import org.apache.sysml.runtime.matrix.JobReturn;
-import org.apache.sysml.runtime.matrix.WriteCSVMR;
-import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.InputInfo;
-import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes;
-import org.apache.sysml.runtime.matrix.mapred.CSVReblockReducer;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.ConvertTarget;
-import 
org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration.MatrixChar_N_ReducerGroups;
-import org.apache.sysml.runtime.util.MapReduceTool;
-
-/**
- * MapReduce job that performs the actual data transformations, such as 
recoding
- * and binning. In contrast to ApplyTxCSVMR, this job generates the output in
- * BinaryBlock format. This job takes a data set as well as the transformation
- * metadata (which, for example, computed from GenTxMtdMR) as inputs.
- * 
- */
-
-@SuppressWarnings("deprecation")
-public class ApplyTfBBMR {
-       
-       public static JobReturn runJob(String inputPath, String rblkInst, 
String otherInst, String spec, String mapsPath, String tmpPath, String 
outputPath, String partOffsetsFile, CSVFileFormatProperties 
inputDataProperties, long numRows, long numColsBefore, long numColsAfter, int 
replication, String headerLine) throws Exception {
-               
-               CSVReblockInstruction rblk = (CSVReblockInstruction) 
InstructionParser.parseSingleInstruction(rblkInst);
-               
-               long[] rlens = new long[]{numRows};
-               long[] clens = new long[]{numColsAfter};
-               int[] brlens = new int[]{rblk.brlen};
-               int[] bclens = new int[]{rblk.bclen};
-               byte[] realIndexes = new byte[]{rblk.input};
-               byte[] resultIndexes = new byte[]{rblk.output};
-
-               JobConf job = new JobConf(ApplyTfBBMR.class);
-               job.setJobName("ApplyTfBB");
-
-               /* Setup MapReduce Job */
-               job.setJarByClass(ApplyTfBBMR.class);
-               
-               // set relevant classes
-               job.setMapperClass(ApplyTfBBMapper.class);
-       
-               MRJobConfiguration.setUpMultipleInputs(job, realIndexes, new 
String[]{inputPath}, new InputInfo[]{InputInfo.CSVInputInfo}, brlens, bclens, 
false, ConvertTarget.CELL);
-
-               MRJobConfiguration.setMatricesDimensions(job, realIndexes, 
rlens, clens);
-               MRJobConfiguration.setBlocksSizes(job, realIndexes, brlens, 
bclens);
-
-               MRJobConfiguration.setCSVReblockInstructions(job, rblkInst);
-               
-               //set up the instructions that will happen in the reducer, 
after the aggregation instrucions
-               MRJobConfiguration.setInstructionsInReducer(job, otherInst);
-
-               job.setInt(MRConfigurationNames.DFS_REPLICATION, replication);
-               
-               //set up preferred custom serialization framework for binary 
block format
-               if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-                       
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-
-               //set up what matrices are needed to pass from the mapper to 
reducer
-               HashSet<Byte> 
mapoutputIndexes=MRJobConfiguration.setUpOutputIndexesForMapper(job, 
realIndexes,  null, 
-                               rblkInst, null, otherInst, resultIndexes);
-
-               MatrixChar_N_ReducerGroups 
ret=MRJobConfiguration.computeMatrixCharacteristics(job, realIndexes, 
-                               null, rblkInst, null, null, null, 
resultIndexes, mapoutputIndexes, false);
-
-               //set up the number of reducers
-               int numRed = WriteCSVMR.determineNumReducers(rlens, clens, 
ConfigurationManager.getNumReducers(), ret.numReducerGroups);
-               job.setNumReduceTasks( numRed );
-
-               //set up the multiple output files, and their format information
-               MRJobConfiguration.setUpMultipleOutputs(job, new 
byte[]{rblk.output}, new byte[]{0}, new String[]{outputPath}, new 
OutputInfo[]{OutputInfo.BinaryBlockOutputInfo}, true, false);
-               
-               // configure mapper and the mapper output key value pairs
-               job.setMapperClass(ApplyTfBBMapper.class);
-               job.setMapOutputKeyClass(TaggedFirstSecondIndexes.class);
-               job.setMapOutputValueClass(BlockRow.class);
-               
-               //configure reducer
-               job.setReducerClass(CSVReblockReducer.class);
-       
-               //turn off adaptivemr
-               job.setBoolean("adaptivemr.map.enable", false);
-
-               //set unique working dir
-               MRJobConfiguration.setUniqueWorkingDir(job);
-               
-               // Add transformation metadata file as well as partOffsetsFile 
to Distributed cache
-               DistributedCache.addCacheFile((new Path(mapsPath)).toUri(), 
job);
-               DistributedCache.createSymlink(job);
-               
-               Path cachefile=new Path(new Path(partOffsetsFile), 
"part-00000");
-               DistributedCache.addCacheFile(cachefile.toUri(), job);
-               DistributedCache.createSymlink(job);
-               
-               job.set(MRJobConfiguration.TF_HAS_HEADER,       
Boolean.toString(inputDataProperties.hasHeader()));
-               job.set(MRJobConfiguration.TF_DELIM,            
inputDataProperties.getDelim());
-               // Adding "dummy" string to handle the case of na_strings = ""
-               if( inputDataProperties.getNAStrings() != null )
-                       job.set(MRJobConfiguration.TF_NA_STRINGS, 
TfUtils.prepNAStrings(inputDataProperties.getNAStrings()) );
-               job.set(MRJobConfiguration.TF_SPEC, spec);
-               job.set(MRJobConfiguration.TF_SMALLEST_FILE, 
CSVReblockMR.findSmallestFile(job, inputPath));
-               job.set(MRJobConfiguration.OUTPUT_MATRICES_DIRS_CONFIG, 
outputPath);
-               job.setLong(MRJobConfiguration.TF_NUM_COLS, numColsBefore);
-               job.set(MRJobConfiguration.TF_TXMTD_PATH, mapsPath);
-               job.set(MRJobConfiguration.TF_HEADER, headerLine);
-               job.set(CSVReblockMR.ROWID_FILE_NAME, cachefile.toString());
-               job.set(MRJobConfiguration.TF_TMP_LOC, tmpPath);
-
-               RunningJob runjob=JobClient.runJob(job);
-               
-               MapReduceTool.deleteFileIfExistOnHDFS(cachefile, job);
-               
-               Group 
group=runjob.getCounters().getGroup(MRJobConfiguration.NUM_NONZERO_CELLS);
-               for(int i=0; i<resultIndexes.length; i++) {
-                       
ret.stats[i].setNonZeros(group.getCounter(Integer.toString(i)));
-               }
-               return new JobReturn(ret.stats, runjob.isSuccessful());
-       }
-       
-}

Reply via email to