[SYSTEMML-569] Distributed spark transform encode/decode (recode, dummy)

(1) Rework of internal encoder/decoder APIs in order to streamline meta
data initialization and avoid redundancy.

(2) Extended cp transform encode/decode for dummycoding, including
generalizations for input/output handling (e.g., pass-through
encoder/decoder) as well as a new dummycoding decoder.

(3) New spark instructions transformencode and transformdecode
(compiler/runtime, so far only for recode and dummycode).

(4) New testcase for transformencode/transformdecode over frames w/
different exec types.

(5) Fixes for frame readers from HFDS and RDDs (schema handling),
transform spec parsing (consistent sorted column id lists), and some
jmlc tests (valid script/input meta data).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/293c81c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/293c81c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/293c81c6

Branch: refs/heads/master
Commit: 293c81c6cae5dbd187b19e09806821f2f2f7db0d
Parents: 7d4acdd
Author: Matthias Boehm <[email protected]>
Authored: Mon Jun 20 12:41:46 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jun 23 17:56:40 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/FunctionOp.java  |  36 +--
 .../sysml/hops/ParameterizedBuiltinOp.java      |   2 +-
 .../RewriteRemovePersistentReadWrite.java       |   6 +-
 .../org/apache/sysml/lops/FunctionCallCP.java   |  20 +-
 .../controlprogram/caching/FrameObject.java     |  27 +-
 .../functionobjects/ParameterizedBuiltin.java   |  11 +-
 .../instructions/SPInstructionParser.java       |   6 +
 .../cp/ParameterizedBuiltinCPInstruction.java   |   7 +-
 ...ReturnParameterizedBuiltinSPInstruction.java | 269 +++++++++++++++++++
 .../ParameterizedBuiltinSPInstruction.java      |  86 +++++-
 .../instructions/spark/SPInstruction.java       |   2 +-
 .../apache/sysml/runtime/io/FrameReader.java    |  20 +-
 .../sysml/runtime/matrix/data/FrameBlock.java   |  12 +
 .../sysml/runtime/transform/BinAgent.java       |  43 ++-
 .../sysml/runtime/transform/DataTransform.java  |  23 --
 .../sysml/runtime/transform/DummycodeAgent.java |  40 ++-
 .../sysml/runtime/transform/MVImputeAgent.java  | 103 ++++---
 .../sysml/runtime/transform/OmitAgent.java      |  33 +--
 .../sysml/runtime/transform/RecodeAgent.java    | 174 ++++++------
 .../sysml/runtime/transform/decode/Decoder.java |  27 +-
 .../transform/decode/DecoderComposite.java      |  17 +-
 .../transform/decode/DecoderDummycode.java      |  73 +++++
 .../transform/decode/DecoderFactory.java        |  63 ++---
 .../transform/decode/DecoderPassThrough.java    |  52 ++--
 .../runtime/transform/decode/DecoderRecode.java |  73 ++---
 .../sysml/runtime/transform/encode/Encoder.java |  41 ++-
 .../transform/encode/EncoderComposite.java      |  57 ++--
 .../transform/encode/EncoderFactory.java        |  32 +--
 .../transform/encode/EncoderPassThrough.java    |  33 +--
 .../runtime/transform/meta/TfMetaUtils.java     |   5 +-
 .../functions/jmlc/FrameReadMetaTest.java       |   4 +-
 .../transform/TransformFrameApplyTest.java      | 194 +++++++++++++
 .../TransformFrameEncodeDecodeTest.java         | 146 ++++++++++
 .../functions/transform/TransformFrameTest.java | 194 -------------
 .../org/apache/sysml/test/utils/TestUtils.java  |  21 ++
 .../jmlc/tfmtd_example2/Recode/district.map     |   4 +
 .../jmlc/tfmtd_example2/Recode/district.mode    |   1 +
 .../tfmtd_example2/Recode/district.ndistinct    |   1 +
 .../jmlc/tfmtd_example2/Recode/floors.map       |   3 +
 .../jmlc/tfmtd_example2/Recode/floors.mode      |   1 +
 .../jmlc/tfmtd_example2/Recode/floors.ndistinct |   1 +
 .../jmlc/tfmtd_example2/Recode/numbathrooms.map |   5 +
 .../tfmtd_example2/Recode/numbathrooms.mode     |   1 +
 .../Recode/numbathrooms.ndistinct               |   1 +
 .../jmlc/tfmtd_example2/Recode/numbedrooms.map  |   7 +
 .../jmlc/tfmtd_example2/Recode/numbedrooms.mode |   1 +
 .../tfmtd_example2/Recode/numbedrooms.ndistinct |   1 +
 .../jmlc/tfmtd_example2/Recode/view.map         |   2 +
 .../jmlc/tfmtd_example2/Recode/view.mode        |   1 +
 .../jmlc/tfmtd_example2/Recode/view.ndistinct   |   1 +
 .../jmlc/tfmtd_example2/Recode/zipcode.map      |   5 +
 .../jmlc/tfmtd_example2/Recode/zipcode.mode     |   1 +
 .../tfmtd_example2/Recode/zipcode.ndistinct     |   1 +
 .../functions/jmlc/tfmtd_example2/coltypes.csv  |   1 +
 .../functions/jmlc/tfmtd_example2/column.names  |   1 +
 .../functions/jmlc/tfmtd_example2/spec.json     |   1 +
 src/test/scripts/functions/jmlc/transform5.dml  |   1 +
 .../transform/TransformFrameEncodeDecode.dml    |  32 +++
 .../functions/transform/ZPackageSuite.java      |   3 +-
 59 files changed, 1337 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/hops/FunctionOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/FunctionOp.java 
b/src/main/java/org/apache/sysml/hops/FunctionOp.java
index ae199b5..03db4da 100644
--- a/src/main/java/org/apache/sysml/hops/FunctionOp.java
+++ b/src/main/java/org/apache/sysml/hops/FunctionOp.java
@@ -214,16 +214,13 @@ public class FunctionOp extends Hop
 
                ExecType et = optFindExecType();
                
-               if ( et != ExecType.CP ) {
-                       throw new HopsException("Invalid execution type for 
function: " + _fname);
-               }
                //construct input lops (recursive)
                ArrayList<Lop> tmp = new ArrayList<Lop>();
                for( Hop in : getInput() )
                        tmp.add( in.constructLops() );
                
                //construct function call
-               FunctionCallCP fcall = new FunctionCallCP( tmp, _fnamespace, 
_fname, _outputs, _outputHops );
+               FunctionCallCP fcall = new FunctionCallCP( tmp, _fnamespace, 
_fname, _outputs, _outputHops, et );
                setLineNumbers( fcall );
                setLops( fcall );
        
@@ -242,23 +239,30 @@ public class FunctionOp extends Hop
        protected ExecType optFindExecType() 
                throws HopsException 
        {
+               checkAndSetForcedPlatform();
+               
                if ( getFunctionType() == FunctionType.MULTIRETURN_BUILTIN ) {
-                       // Since the memory estimate is only conservative, do 
not throw
-                       // exception if the estimated memory is larger than the 
budget
-                       // Nevertheless, memory estimates these functions are 
useful for 
-                       // other purposes, such as compiling parfor
-                       return ExecType.CP;
                        
                        // check if there is sufficient memory to execute this 
function
-                       /*if ( getMemEstimate() < 
OptimizerUtils.getMemBudget(true) ) {
-                               return ExecType.CP;
-                       }
+                       if( 
getFunctionName().equalsIgnoreCase("transformencode") ) {
+                               _etype = ((_etypeForced==ExecType.SPARK 
+                                       || (getMemEstimate() >= 
OptimizerUtils.getLocalMemBudget()
+                                               && 
OptimizerUtils.isSparkExecutionMode())) ? ExecType.SPARK : ExecType.CP);
+                       }       
                        else {
-                               throw new HopsException("Insufficient memory to 
execute function: " + getFunctionName());
-                       }*/
+                               // Since the memory estimate is only 
conservative, do not throw
+                               // exception if the estimated memory is larger 
than the budget
+                               // Nevertheless, memory estimates these 
functions are useful for 
+                               // other purposes, such as compiling parfor
+                               _etype = ExecType.CP;
+                       }
                }
-               // the actual function call is always CP
-               return ExecType.CP;
+               else {
+                       // the actual function call is always CP
+                       _etype = ExecType.CP;
+               }
+               
+               return _etype;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 02d3715..f1ca98c 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -1121,7 +1121,7 @@ public class ParameterizedBuiltinOp extends Hop 
implements MultiThreadedHop
                
                //force CP for in-memory only transform builtins
                if( (_op == ParamBuiltinOp.TRANSFORMAPPLY && 
REMOTE==ExecType.MR)
-                       || _op == ParamBuiltinOp.TRANSFORMDECODE
+                       || _op == ParamBuiltinOp.TRANSFORMDECODE && 
REMOTE==ExecType.MR
                        || _op == ParamBuiltinOp.TRANSFORMMETA 
                        ||  _op == ParamBuiltinOp.TOSTRING) {
                        _etype = ExecType.CP;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java
 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java
index 89812f7..74db35a 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteRemovePersistentReadWrite.java
@@ -110,15 +110,17 @@ public class RewriteRemovePersistentReadWrite extends 
HopRewriteRule
                                                if (hop.getDataType() == 
DataType.SCALAR) {
                                                        
dop.removeInput("iofilename");
                                                }
-                                       } else
+                                       } 
+                                       else
                                                LOG.warn("Non-registered 
persistent read of variable '"+dop.getName()+"' (line 
"+dop.getBeginLine()+").");
                                        break;
                                case PERSISTENTWRITE:
-                                       if( _outputs.contains(dop.getName()) )
+                                       if( _outputs.contains(dop.getName()) ) {
                                                
dop.setDataOpType(DataOpTypes.TRANSIENTWRITE);
                                                if (hop.getDataType() == 
DataType.SCALAR) {
                                                        
dop.removeInput("iofilename");
                                                }
+                                       }
                                        else
                                                LOG.warn("Non-registered 
persistent write of variable '"+dop.getName()+"' (line 
"+dop.getBeginLine()+").");
                                        break;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/lops/FunctionCallCP.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/FunctionCallCP.java 
b/src/main/java/org/apache/sysml/lops/FunctionCallCP.java
index 9cbc9c1..cea01af 100644
--- a/src/main/java/org/apache/sysml/lops/FunctionCallCP.java
+++ b/src/main/java/org/apache/sysml/lops/FunctionCallCP.java
@@ -42,17 +42,18 @@ public class FunctionCallCP extends Lop
        private String[] _outputs;
        private ArrayList<Lop> _outputLops = null;
 
-       public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String 
fname, String[] outputs, ArrayList<Hop> outputHops) throws HopsException, 
LopsException {
-               this(inputs, fnamespace, fname, outputs);
+       public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String 
fname, String[] outputs, ArrayList<Hop> outputHops, ExecType et) 
+               throws HopsException, LopsException 
+       {
+               this(inputs, fnamespace, fname, outputs, et);
                if(outputHops != null) {
                        _outputLops = new ArrayList<Lop>();
-                       for(Hop h : outputHops) {
+                       for(Hop h : outputHops)
                                _outputLops.add( h.constructLops() );
-                       }
                }
        }
        
-       public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String 
fname, String[] outputs) 
+       public FunctionCallCP(ArrayList<Lop> inputs, String fnamespace, String 
fname, String[] outputs, ExecType et) 
        {
                super(Lop.Type.FunctionCallCP, DataType.UNKNOWN, 
ValueType.UNKNOWN);    
                //note: data scalar in order to prevent generation of redundant 
createvar, rmvar
@@ -62,8 +63,7 @@ public class FunctionCallCP extends Lop
                _outputs = outputs;
                
                //wire inputs
-               for( Lop in : inputs )
-               {
+               for( Lop in : inputs ) {
                        addInput( in );
                        in.addOutput( this );
                }
@@ -73,7 +73,7 @@ public class FunctionCallCP extends Lop
                boolean aligner = false;
                boolean definesMRJob = false;
                lps.addCompatibility(JobType.INVALID);
-               this.lps.setProperties(inputs, ExecType.CP, 
ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+               lps.setProperties(inputs, et, ExecLocation.ControlProgram, 
breaksAlignment, aligner, definesMRJob );
        }
 
        public ArrayList<Lop> getFunctionOutputs() {
@@ -87,7 +87,7 @@ public class FunctionCallCP extends Lop
 
        private String getInstructionsMultipleReturnBuiltins(String[] inputs, 
String[] outputs) {
                StringBuilder sb = new StringBuilder();
-               sb.append("CP");
+               sb.append(getExecType());
                
                sb.append(Lop.OPERAND_DELIMITOR); 
                sb.append(_fname.toLowerCase());
@@ -123,8 +123,8 @@ public class FunctionCallCP extends Lop
                //NOTE: we have to append full input operand information to 
distinguish literals from variables w/ equal name
 
                StringBuilder inst = new StringBuilder();
+               inst.append(getExecType());
                
-               inst.append("CP");
                inst.append(Lop.OPERAND_DELIMITOR); 
                inst.append("extfunct");
                inst.append(Lop.OPERAND_DELIMITOR);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index 04d1543..db98a3e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -177,14 +177,6 @@ public class FrameObject extends CacheableData<FrameBlock>
                return data;
        }
 
-       /**
-        * Read Frame object from RDD 
-        * 
-        * @param rdd
-        * @param status
-        * 
-        * @param fo
-        */
        @Override
        protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean 
status)
                        throws IOException 
@@ -198,26 +190,27 @@ public class FrameObject extends CacheableData<FrameBlock>
                
                MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
                MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
+               int rlen = (int)mc.getRows();
+               int clen = (int)mc.getCols();
+               
+               //handle missing schema if necessary
+               List<ValueType> lschema = (_schema!=null) ? _schema : 
+                       Collections.nCopies(clen>=1 ? (int)clen : 1, 
ValueType.STRING);
                
                FrameBlock fb = null;
-               try 
-               {
+               try  {
                        //prevent unnecessary collect through rdd checkpoint
                        if( rdd.allowsShortCircuitCollect() ) {
                                lrdd = (RDDObject)rdd.getLineageChilds().get(0);
                        }
                        
-                       //obtain frame block from RDD
-                       int rlen = (int)mc.getRows();
-                       int clen = (int)mc.getCols();
-
-                       //collect frame block from binary cell RDD
-                       fb = SparkExecutionContext.toFrameBlock(lrdd, _schema, 
rlen, clen);     
+                       //collect frame block from binary block RDD
+                       fb = SparkExecutionContext.toFrameBlock(lrdd, lschema, 
rlen, clen);     
                }
                catch(DMLRuntimeException ex) {
                        throw new IOException(ex);
                }
-               
+                               
                //sanity check correct output
                if( fb == null ) {
                        throw new IOException("Unable to load frame from rdd: 
"+lrdd.getVarName());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
 
b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
index 2bd19fa..c542328 100644
--- 
a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
+++ 
b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
@@ -44,8 +44,11 @@ public class ParameterizedBuiltin extends ValueFunction
 
        private static final long serialVersionUID = -5966242955816522697L;
        
-       public enum ParameterizedBuiltinCode { INVALID, CDF, INVCDF, RMEMPTY, 
REPLACE, REXPAND, TRANSFORM, TRANSFORMAPPLY };
-       public enum ProbabilityDistributionCode { INVALID, NORMAL, EXP, CHISQ, 
F, T };
+       public enum ParameterizedBuiltinCode { 
+               INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, 
+               TRANSFORM, TRANSFORMAPPLY, TRANSFORMDECODE };
+       public enum ProbabilityDistributionCode { 
+               INVALID, NORMAL, EXP, CHISQ, F, T };
        
        public ParameterizedBuiltinCode bFunc;
        public ProbabilityDistributionCode distFunc;
@@ -61,6 +64,7 @@ public class ParameterizedBuiltin extends ValueFunction
                String2ParameterizedBuiltinCode.put( "rexpand", 
ParameterizedBuiltinCode.REXPAND);
                String2ParameterizedBuiltinCode.put( "transform", 
ParameterizedBuiltinCode.TRANSFORM);
                String2ParameterizedBuiltinCode.put( "transformapply", 
ParameterizedBuiltinCode.TRANSFORMAPPLY);
+               String2ParameterizedBuiltinCode.put( "transformdecode", 
ParameterizedBuiltinCode.TRANSFORMDECODE);
        }
        
        static public HashMap<String, ProbabilityDistributionCode> 
String2DistCode;
@@ -170,6 +174,9 @@ public class ParameterizedBuiltin extends ValueFunction
                        
                        case TRANSFORMAPPLY:
                                return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMAPPLY);
+                       
+                       case TRANSFORMDECODE:
+                               return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMDECODE);
                                
                        default:
                                throw new DMLRuntimeException("Invalid 
parameterized builtin code: " + code);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 663e6b4..e0c4631 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -56,6 +56,7 @@ import 
org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
+import 
org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PmmSPInstruction;
@@ -213,6 +214,8 @@ public class SPInstructionParser extends InstructionParser
                String2SPInstructionType.put( "rexpand"      , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( "transform"    , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( 
"transformapply",SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+               String2SPInstructionType.put( 
"transformdecode",SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+               String2SPInstructionType.put( 
"transformencode",SPINSTRUCTION_TYPE.MultiReturnBuiltin);
                
                String2SPInstructionType.put( "mappend", 
SPINSTRUCTION_TYPE.MAppend);
                String2SPInstructionType.put( "rappend", 
SPINSTRUCTION_TYPE.RAppend);
@@ -367,6 +370,9 @@ public class SPInstructionParser extends InstructionParser
                        case ParameterizedBuiltin:
                                return 
ParameterizedBuiltinSPInstruction.parseInstruction(str);
                                
+                       case MultiReturnBuiltin:
+                               return 
MultiReturnParameterizedBuiltinSPInstruction.parseInstruction(str);
+                               
                        case MatrixReshape:
                                return 
MatrixReshapeSPInstruction.parseInstruction(str);
                                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index e6a536d..eca1627 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -43,6 +43,8 @@ import org.apache.sysml.runtime.transform.DataTransform;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.transform.decode.Decoder;
 import org.apache.sysml.runtime.transform.decode.DecoderFactory;
+import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.encode.EncoderFactory;
 import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.DataConverter;
 
@@ -269,7 +271,8 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        FrameBlock meta = ec.getFrameInput(params.get("meta")); 
        
                        
                        //compute transformapply
-                       MatrixBlock mbout = 
DataTransform.cpDataTransform(getParameterMap(), data, meta );
+                       Encoder encoder = 
EncoderFactory.createEncoder(params.get("spec"), data.getNumColumns(), meta);
+                       MatrixBlock mbout = encoder.apply(data, new 
MatrixBlock(data.getNumRows(), data.getNumColumns(), false));
                        
                        //release locks
                        ec.setMatrixOutput(output.getName(), mbout);
@@ -283,7 +286,7 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        
                        //compute transformdecode
                        Decoder decoder = 
DecoderFactory.createDecoder(getParameterMap().get("spec"), null, meta);
-                       FrameBlock fbout = decoder.decode(data, new 
FrameBlock(data.getNumColumns(), ValueType.STRING));
+                       FrameBlock fbout = decoder.decode(data, new 
FrameBlock(meta.getNumColumns(), ValueType.STRING));
                        
                        //release locks
                        ec.setFrameOutput(output.getName(), fbout);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
new file mode 100644
index 0000000..fc9e9ce
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyFunction;
+import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyOffsetFunction;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.io.FrameReader;
+import org.apache.sysml.runtime.io.FrameReaderFactory;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.transform.RecodeAgent;
+import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.encode.EncoderComposite;
+import org.apache.sysml.runtime.transform.encode.EncoderFactory;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
+import org.apache.sysml.runtime.transform.meta.TfOffsetMap;
+
+import scala.Tuple2;
+
+
+public class MultiReturnParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction 
+{
+       protected ArrayList<CPOperand> _outputs;
+       
+       public MultiReturnParameterizedBuiltinSPInstruction(Operator op, 
CPOperand input1, CPOperand input2, ArrayList<CPOperand> outputs, String 
opcode, String istr ) {
+               super(op, input1, input2, outputs.get(0), opcode, istr);
+               _sptype = SPINSTRUCTION_TYPE.MultiReturnBuiltin;
+               _outputs = outputs;
+       }
+       
+       public CPOperand getOutput(int i) {
+               return _outputs.get(i);
+       }
+       
+       /**
+        * 
+        * @param str
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static MultiReturnParameterizedBuiltinSPInstruction 
parseInstruction( String str ) 
+               throws DMLRuntimeException 
+       {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               ArrayList<CPOperand> outputs = new ArrayList<CPOperand>();
+               String opcode = parts[0];
+               
+               if ( opcode.equalsIgnoreCase("transformencode") ) {
+                       // one input and two outputs
+                       CPOperand in1 = new CPOperand(parts[1]);
+                       CPOperand in2 = new CPOperand(parts[2]);
+                       outputs.add ( new CPOperand(parts[3], ValueType.DOUBLE, 
DataType.MATRIX) );
+                       outputs.add ( new CPOperand(parts[4], ValueType.STRING, 
DataType.FRAME) );
+                       return new 
MultiReturnParameterizedBuiltinSPInstruction(null, in1, in2, outputs, opcode, 
str);
+               }
+               else {
+                       throw new DMLRuntimeException("Invalid opcode in 
MultiReturnBuiltin instruction: " + opcode);
+               }
+
+       }
+
+       @Override 
+       @SuppressWarnings("unchecked")
+       public void processInstruction(ExecutionContext ec) 
+               throws DMLRuntimeException 
+       {
+               SparkExecutionContext sec = (SparkExecutionContext) ec;
+               
+               try
+               {
+                       //get input RDD and meta data
+                       FrameObject fo = sec.getFrameObject(input1.getName());
+                       FrameObject fometa = 
sec.getFrameObject(_outputs.get(1).getName());
+                       JavaPairRDD<Long,FrameBlock> in = 
(JavaPairRDD<Long,FrameBlock>)
+                                       sec.getRDDHandleForFrameObject(fo, 
InputInfo.BinaryBlockInputInfo);
+                       String spec = ec.getScalarInput(input2.getName(), 
input2.getValueType(), input2.isLiteral()).getStringValue();
+                       MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(input1.getName());
+                       MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+                       
+                       //step 1: build transform meta data
+                       Encoder encoderBuild = 
EncoderFactory.createEncoder(spec, 
+                                       fo.getSchema(), 
(int)fo.getNumColumns(), null);
+                       
+                       Accumulator<Long> accMax = 
sec.getSparkContext().accumulator(0L, new MaxAcc()); 
+                       in.mapPartitionsToPair(new 
TransformEncodeBuildFunction(encoderBuild))
+                         .distinct().groupByKey()
+                         .flatMap(new TransformEncodeGroupFunction(accMax))
+                         .saveAsTextFile(fometa.getFileName()); //trigger eval
+                       
+                       //reuse multi-threaded reader 
+                       FrameReader reader = 
FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
+                       FrameBlock meta = 
reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), 
fo.getNumColumns());
+                       meta.recomputeColumnCardinality(); //recompute num 
distinct items per column
+                       
+                       //step 2: transform apply (similar to spark 
transformapply)
+                       //compute omit offset map for block shifts
+                       TfOffsetMap omap = null;
+                       if( TfMetaUtils.containsOmitSpec(spec) ) {
+                               omap = new 
TfOffsetMap(SparkUtils.toIndexedLong(in.mapToPair(
+                                       new 
RDDTransformApplyOffsetFunction(spec)).collect()));
+                       }
+                               
+                       //create encoder broadcast (avoiding replication per 
task) 
+                       Encoder encoder = EncoderFactory.createEncoder(spec, 
+                                       fo.getSchema(), 
(int)fo.getNumColumns(), meta);
+                       
mcOut.setDimension(mcIn.getRows()-((omap!=null)?omap.getNumRmRows():0), 
encoder.getNumCols()); 
+                       Broadcast<Encoder> bmeta = 
sec.getSparkContext().broadcast(encoder);
+                       Broadcast<TfOffsetMap> bomap = (omap!=null) ? 
sec.getSparkContext().broadcast(omap) : null;
+                       
+                       //execute transform apply
+                       JavaPairRDD<Long,FrameBlock> tmp = in
+                                       .mapToPair(new 
RDDTransformApplyFunction(bmeta, bomap));
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
FrameRDDConverterUtils
+                                       .binaryBlockToMatrixBlock(tmp, mcOut, 
mcOut);
+                       
+                       //set output and maintain lineage/output characteristics
+                       sec.setRDDHandleForVariable(_outputs.get(0).getName(), 
out);
+                       sec.addLineageRDD(_outputs.get(0).getName(), 
input1.getName());
+                       sec.setFrameOutput(_outputs.get(1).getName(), meta);
+               }
+               catch(IOException ex) {
+                       throw new RuntimeException(ex);
+               }
+       }
+       
+       /**
+        * 
+        */
+       public static class TransformEncodeBuildFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String>
+       {
+               private static final long serialVersionUID = 
6336375833412029279L;
+
+               private Encoder _encoder = null;
+               
+               public TransformEncodeBuildFunction(Encoder encoder) {
+                       _encoder = encoder;
+               }
+               
+               @Override
+               public Iterable<Tuple2<Integer, String>> 
call(Iterator<Tuple2<Long, FrameBlock>> iter)
+                       throws Exception 
+               {
+                       //build meta data (e.g., recode maps)
+                       while( iter.hasNext() ) {
+                               _encoder.build(iter.next()._2());       
+                       }
+                       
+                       //output recode maps as columnID - token pairs
+                       ArrayList<Tuple2<Integer,String>> ret = new 
ArrayList<Tuple2<Integer,String>>();
+                       if( _encoder instanceof EncoderComposite )
+                               for( Encoder cEncoder : 
((EncoderComposite)_encoder).getEncoders() )
+                                       if( cEncoder instanceof RecodeAgent ) {
+                                               RecodeAgent ra = (RecodeAgent) 
cEncoder;
+                                               
HashMap<Integer,HashMap<String,Long>> tmp = ra.getCPRecodeMaps();
+                                               for( 
Entry<Integer,HashMap<String,Long>> e1 : tmp.entrySet() )
+                                                       for( String token : 
e1.getValue().keySet() )
+                                                               ret.add(new 
Tuple2<Integer,String>(e1.getKey(), token));
+                                       }
+                               
+                       return ret;
+               }
+       }
+       
+       /**
+        * 
+        */
+       public static class TransformEncodeGroupFunction implements 
FlatMapFunction<Tuple2<Integer, Iterable<String>>, String>
+       {
+               private static final long serialVersionUID = 
-1034187226023517119L;
+
+               private Accumulator<Long> _accMax = null;
+               
+               public TransformEncodeGroupFunction( Accumulator<Long> accMax ) 
{
+                       _accMax = accMax;
+               }
+               
+               @Override
+               public Iterable<String> call(Tuple2<Integer, Iterable<String>> 
arg0)
+                       throws Exception 
+               {
+                       String colID = String.valueOf(arg0._1());
+                       Iterator<String> iter = arg0._2().iterator();
+                       
+                       ArrayList<String> ret = new ArrayList<String>();
+                       StringBuilder sb = new StringBuilder();
+                       long rowID = 1;
+                       while( iter.hasNext() ) {
+                               sb.append(rowID);
+                               sb.append(' ');
+                               sb.append(colID);
+                               sb.append(' ');
+                               
sb.append(RecodeAgent.constructRecodeMapEntry(iter.next(), rowID));
+                               ret.add(sb.toString());
+                               sb.setLength(0); 
+                               rowID++;
+                       }
+                       _accMax.add(rowID-1);
+                       
+                       return ret;
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class MaxAcc implements AccumulatorParam<Long>, 
Serializable 
+       {
+               private static final long serialVersionUID = 
-3739727823287550826L;
+
+               @Override
+               public Long addInPlace(Long arg0, Long arg1) {
+                       return Math.max(arg0, arg1);
+               }
+
+               @Override
+               public Long zero(Long arg0) {
+                       return arg0;
+               }
+
+               @Override
+               public Long addAccumulator(Long arg0, Long arg1) {
+                       return Math.max(arg0, arg1);    
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index a80dd1b..25a4078 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -37,6 +37,7 @@ import org.apache.sysml.parser.Statement;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
@@ -72,6 +73,8 @@ import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
 import org.apache.sysml.runtime.transform.DataTransform;
 import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.decode.Decoder;
+import org.apache.sysml.runtime.transform.decode.DecoderFactory;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.transform.encode.EncoderFactory;
 import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
@@ -169,7 +172,8 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        else if(   opcode.equalsIgnoreCase("rexpand") 
                                        || opcode.equalsIgnoreCase("replace")
                                        || opcode.equalsIgnoreCase("transform")
-                                       || 
opcode.equalsIgnoreCase("transformapply")) 
+                                       || 
opcode.equalsIgnoreCase("transformapply")
+                                       || 
opcode.equalsIgnoreCase("transformdecode")) 
                        {
                                func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
                                return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, 
opcode, str, false);
@@ -450,6 +454,30 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        sec.addLineageRDD(output.getName(), 
params.get("target"));
                        ec.releaseFrameInput(params.get("meta"));
                }
+               else if ( opcode.equalsIgnoreCase("transformdecode") ) 
+               {
+                       //get input RDD and meta data
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(params.get("target"));
+                       MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(params.get("target"));
+                       FrameBlock meta = 
sec.getFrameInput(params.get("meta"));                
+                       
+                       //reblock if necessary (clen > bclen)
+                       if( mc.getCols() > mc.getNumColBlocks() ) {
+                               in = in.mapToPair(new 
RDDTransformDecodeExpandFunction(
+                                               (int)mc.getCols(), 
mc.getColsPerBlock()));
+                               in = RDDAggregateUtils.mergeByKey(in);
+                       }
+                       
+                       //construct decoder and decode individual matrix blocks
+                       Decoder decoder = 
DecoderFactory.createDecoder(params.get("spec"), null, meta);
+                       JavaPairRDD<Long,FrameBlock> out = in.mapToPair(
+                                       new RDDTransformDecodeFunction(decoder, 
meta.getNumColumns(), mc.getRowsPerBlock()));
+                       
+                       //set output and maintain lineage/output characteristics
+                       sec.setRDDHandleForVariable(output.getName(), out);
+                       sec.addLineageRDD(output.getName(), 
params.get("target"));
+                       ec.releaseFrameInput(params.get("meta"));
+               }
                else {
                        throw new DMLRuntimeException("Unknown parameterized 
builtin opcode: "+opcode);
                }
@@ -776,6 +804,62 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
        
        /**
         * 
+        */
+       public static class RDDTransformDecodeFunction implements 
PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-4797324742568170756L;
+               
+               private Decoder _decoder = null;
+               private int _clen = -1;
+               private int _brlen = -1;
+               
+               public RDDTransformDecodeFunction(Decoder decoder, int clen, 
int brlen) {
+                       _decoder = decoder;
+                       _clen = clen;
+                       _brlen = brlen;
+               }
+
+               @Override
+               public Tuple2<Long,FrameBlock> call(Tuple2<MatrixIndexes, 
MatrixBlock> in) 
+                       throws Exception 
+               {
+                       long rix = 
UtilFunctions.computeCellIndex(in._1().getRowIndex(), _brlen, 0);
+                       return new Tuple2<Long, FrameBlock>(rix, 
+                                       _decoder.decode(in._2(), new 
FrameBlock(_clen, ValueType.STRING)));
+               }
+       }
+       
+       public static class RDDTransformDecodeExpandFunction implements 
PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,MatrixIndexes,MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
-8187400248076127598L;
+               
+               private int _clen = -1;
+               private int _bclen = -1;
+               
+               public RDDTransformDecodeExpandFunction(int clen, int bclen) {
+                       _clen = clen;
+                       _bclen = bclen;
+               }
+
+               @Override
+               public Tuple2<MatrixIndexes,MatrixBlock> 
call(Tuple2<MatrixIndexes, MatrixBlock> in) 
+                       throws Exception 
+               {
+                       MatrixIndexes inIx = in._1();
+                       MatrixBlock inBlk = in._2();
+                       
+                       //construct expanded block via leftindexing
+                       int cl = 
(int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 0)-1;
+                       int cu = 
(int)UtilFunctions.computeCellIndex(inIx.getColumnIndex(), _bclen, 
inBlk.getNumColumns()-1)-1;
+                       MatrixBlock out = new MatrixBlock(inBlk.getNumRows(), 
_clen, false);
+                       out = out.leftIndexingOperations(inBlk, 0, 
inBlk.getNumRows()-1, cl, cu, null, UpdateType.INPLACE_PINNED);
+                       
+                       return new Tuple2<MatrixIndexes, MatrixBlock>(new 
MatrixIndexes(inIx.getRowIndex(), 1), out);
+               }
+       }
+       
+       /**
+        * 
         * @param mc1
         * @param mcOut
         * @param out

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
index acc2ff6..d9c49aa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SPInstruction.java
@@ -40,7 +40,7 @@ public abstract class SPInstruction extends Instruction
        public enum SPINSTRUCTION_TYPE { 
                MAPMM, MAPMMCHAIN, CPMM, RMM, TSMM, PMM, ZIPMM, PMAPMM, 
//matrix multiplication instructions  
                MatrixIndexing, Reorg, ArithmeticBinary, RelationalBinary, 
AggregateUnary, AggregateTernary, Reblock, CSVReblock, 
-               Builtin, BuiltinUnary, BuiltinBinary, Checkpoint, Cast,
+               Builtin, BuiltinUnary, BuiltinBinary, MultiReturnBuiltin, 
Checkpoint, Cast,
                CentralMoment, Covariance, QSort, QPick, 
                ParameterizedBuiltin, MAppend, RAppend, GAppend, 
GAlignedAppend, Rand, 
                MatrixReshape, Ternary, Quaternary, CumsumAggregate, 
CumsumOffset, BinUaggChain, UaggOuterChain, 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 6a4b469..d37bbde 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -53,8 +53,7 @@ public abstract class FrameReader
         * @param clen
         * @return
         */
-       public abstract FrameBlock readFrameFromHDFS( String fname, 
List<ValueType> schema, List<String> names,
-                       long rlen, long clen)
+       public abstract FrameBlock readFrameFromHDFS( String fname, 
List<ValueType> schema, List<String> names, long rlen, long clen)
                throws IOException, DMLRuntimeException;
        
        /**
@@ -89,13 +88,11 @@ public abstract class FrameReader
         * @param iNumColumns
         * @return
         */
-       public List<ValueType> getDefSchema( long lNumColumns )
+       public List<ValueType> getDefSchema( long clen )
                throws IOException, DMLRuntimeException
        {
-               List<ValueType> schema = new ArrayList<ValueType>();
-               for (int i=0; i < lNumColumns; ++i)
-                       schema.add(ValueType.STRING);
-               return schema;
+               int lclen = Math.max((int)clen, 1);
+               return Collections.nCopies(lclen, ValueType.STRING);
        }
 
        /**
@@ -103,11 +100,11 @@ public abstract class FrameReader
         * @param iNumColumns
         * @return
         */
-       public List<String> getDefColNames( long lNumColumns )
+       public List<String> getDefColNames( long clen )
                throws IOException, DMLRuntimeException
        {
                List<String> colNames = new ArrayList<String>();
-               for (int i=0; i < lNumColumns; ++i)
+               for (int i=0; i < clen; ++i)
                        colNames.add("C"+i);
                return colNames;
        }
@@ -183,7 +180,7 @@ public abstract class FrameReader
         * @return
         */
        protected static List<String> createOutputNames(List<String> names, 
long ncol) {
-               if( names.size()==1 && ncol > 1 )
+               if( names.size() != ncol )
                        return FrameBlock.createColNames((int)ncol);
                return names;
        }
@@ -203,7 +200,6 @@ public abstract class FrameReader
        
                //check for empty file
                if( MapReduceTool.isFileEmpty( fs, path.toString() ) )
-                       throw new EOFException("Empty input file "+ 
path.toString() +".");
-               
+                       throw new EOFException("Empty input file "+ 
path.toString() +".");              
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index e51bd9d..e5c5fec 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -277,6 +277,18 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                return "C" + i;
        }
        
+       /**
+        * 
+        */
+       public void recomputeColumnCardinality() {
+               for( int j=0; j<getNumColumns(); j++ ) {
+                       int card = 0;
+                       for( int i=0; i<getNumRows(); i++ )
+                               card += (get(i, j) != null) ? 1 : 0;
+                       _colmeta.get(j).setNumDistinct(card);
+               }
+       }
+       
        ///////
        // basic get and set functionality
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index 3ed9a1d..fe83627 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -315,6 +315,18 @@ public class BinAgent extends Encoder
                }
        }
        
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               build(in);
+               return apply(in, out);
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               // TODO Auto-generated method stub
+       }
+       
        /**
         * Method to apply transformations.
         * 
@@ -360,40 +372,15 @@ public class BinAgent extends Encoder
                }
                return out;
        }
-       
-       @Override
-       public double[] encode(String[] in, double[] out) {
-               // TODO Auto-generated method stub
-               return null;
-       }
-
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               build(in);
-               return apply(in, out);
-       }
-
-       @Override
-       public void build(String[] in) {
-               // TODO Auto-generated method stub
-       }
 
        @Override
-       public void build(FrameBlock in) {
-               // TODO Auto-generated method stub
-       }
-
-       @Override
-       public FrameBlock getMetaData(FrameBlock out) {
+       public FrameBlock getMetaData(FrameBlock meta) {
                // TODO Auto-generated method stub
                return null;
        }
        
-       /**
-        * 
-        * @param meta
-        */
-       public void initBins(FrameBlock meta) {
+       @Override
+       public void initMetaData(FrameBlock meta) {
                _binMins = new double[_colList.length][];
                _binMaxs = new double[_colList.length][];
                for( int j=0; j<_colList.length; j++ ) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java 
b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
index c17b662..c1e1702 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
@@ -74,13 +74,10 @@ import org.apache.sysml.runtime.matrix.JobReturn;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
-import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
-import org.apache.sysml.runtime.transform.encode.Encoder;
-import org.apache.sysml.runtime.transform.encode.EncoderFactory;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.utils.JSONHelper;
@@ -1043,26 +1040,6 @@ public class DataTransform
        }
        
        /**
-        * Apply given transform metadata (incl recode maps) over an in-memory 
frame input in order to
-        * create a transformed numerical matrix. Note: The number of rows 
always remains unchanged, 
-        * whereas the number of column might increase or decrease. 
-        * 
-        * @param params
-        * @param input
-        * @param meta
-        * @param spec
-        * @return
-        * @throws DMLRuntimeException
-        * @throws  
-        */
-       public static MatrixBlock cpDataTransform(HashMap<String,String> 
params, FrameBlock input, FrameBlock meta) 
-               throws DMLRuntimeException
-       {
-               Encoder encoder = 
EncoderFactory.createEncoder(params.get("spec"), input.getNumColumns(), meta);
-               return encoder.apply(input, new MatrixBlock(input.getNumRows(), 
input.getNumColumns(), false));
-       }
-       
-       /**
         * Helper function to fetch and sort the list of part files under the 
given
         * input directory.
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index bcb06df..aeeb62b 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -377,10 +377,20 @@ public class DummycodeAgent extends Encoder
                                }
                        }
                        _dummycodedLength += _domainSizes[i]-1;
-                       //System.out.println("colID=" + colID + ", domainsize=" 
+ _domainSizes[i] + ", dcdLength=" + _dummycodedLength);
                }
        }
 
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               return apply(in, out);
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               //do nothing
+       }
+       
        /**
         * Method to apply transformations.
         * 
@@ -444,39 +454,19 @@ public class DummycodeAgent extends Encoder
        }
 
        @Override
-       public double[] encode(String[] in, double[] out) {
-               //TODO
-               return null;
-       }
-
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               return apply(in, out);
-       }
-
-       @Override
-       public void build(String[] in) {
-               //do nothing
-       }
-
-       @Override
-       public void build(FrameBlock in) {
-               //do nothing
-       }
-
-       @Override
        public FrameBlock getMetaData(FrameBlock out) {
-               return null;
+               return out;
        }
        
-       public void initDomainSizes(FrameBlock meta) {
+       @Override
+       public void initMetaData(FrameBlock meta) {
                //initialize domain sizes and output num columns
                _domainSizes = new int[_colList.length];
                _dummycodedLength = _clen;
                for( int j=0; j<_colList.length; j++ ) {
                        int colID = _colList[j]; //1-based
                        _domainSizes[j] = 
(int)meta.getColumnMetadata().get(colID-1).getNumDistinct();
-                       _dummycodedLength +=  _domainSizes[j];
+                       _dummycodedLength +=  _domainSizes[j]-1;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index 1aeb465..344693c 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -96,7 +96,7 @@ public class MVImputeAgent extends Encoder
        
        private String[] _replacementList = null;               // 
replacements: for global_mean, mean; and for global_mode, recode id of mode 
category
        private String[] _NAstrings = null;
-       
+       private List<Integer> _rcList = null; 
        
        public String[] getReplacements() { return _replacementList; }
        public KahanObject[] getMeans()   { return _meanList; }
@@ -890,12 +890,43 @@ public class MVImputeAgent extends Encoder
                }
        }
        
-       /**
-        * Method to apply transformations.
-        * 
-        * @param words
-        * @return
-        */
+       public MVMethod getMethod(int colID) {
+               int idx = isApplicable(colID);          
+               if(idx == -1)
+                       return MVMethod.INVALID;
+               
+               switch(_mvMethodList[idx])
+               {
+                       case 1: return MVMethod.GLOBAL_MEAN;
+                       case 2: return MVMethod.GLOBAL_MODE;
+                       case 3: return MVMethod.CONSTANT;
+                       default: return MVMethod.INVALID;
+               }
+               
+       }
+       
+       public long getNonMVCount(int colID) {
+               int idx = isApplicable(colID);
+               return (idx == -1) ? 0 : _countList[idx];
+       }
+       
+       public String getReplacement(int colID)  {
+               int idx = isApplicable(colID);          
+               return (idx == -1) ? null : _replacementList[idx];
+       }
+       
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+       
+       @Override
+       public void build(FrameBlock in) {
+               // TODO Auto-generated method stub
+               
+       }
+
        @Override
        public String[] apply(String[] words) 
        {       
@@ -926,32 +957,6 @@ public class MVImputeAgent extends Encoder
                return words;
        }
        
-       public MVMethod getMethod(int colID) {
-               int idx = isApplicable(colID);          
-               if(idx == -1)
-                       return MVMethod.INVALID;
-               
-               switch(_mvMethodList[idx])
-               {
-                       case 1: return MVMethod.GLOBAL_MEAN;
-                       case 2: return MVMethod.GLOBAL_MODE;
-                       case 3: return MVMethod.CONSTANT;
-                       default: return MVMethod.INVALID;
-               }
-               
-       }
-       
-       public long getNonMVCount(int colID) {
-               int idx = isApplicable(colID);
-               return (idx == -1) ? 0 : _countList[idx];
-       }
-       
-       public String getReplacement(int colID)  {
-               int idx = isApplicable(colID);          
-               return (idx == -1) ? null : _replacementList[idx];
-       }
-       
-
        @Override
        public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
                for(int i=0; i<in.getNumRows(); i++) {
@@ -965,26 +970,6 @@ public class MVImputeAgent extends Encoder
        }
        
        @Override
-       public double[] encode(String[] in, double[] out) {
-               // TODO Auto-generated method stub
-               return null;
-       }
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               // TODO Auto-generated method stub
-               return null;
-       }
-       @Override
-       public void build(String[] in) {
-               // TODO Auto-generated method stub
-               
-       }
-       @Override
-       public void build(FrameBlock in) {
-               // TODO Auto-generated method stub
-               
-       }
-       @Override
        public FrameBlock getMetaData(FrameBlock out) {
                // TODO Auto-generated method stub
                return null;
@@ -995,14 +980,14 @@ public class MVImputeAgent extends Encoder
         * @param meta
         * @param rcList
         */
-       public void initReplacementList(FrameBlock meta, List<Integer> rcList) {
+       public void initMetaData(FrameBlock meta) {
                //init replacement lists, replace recoded values to
                //apply mv imputation potentially after recoding
                _replacementList = new String[_colList.length];
                for( int j=0; j<_colList.length; j++ ) {
                        int colID = _colList[j];
                        String mvVal = 
UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); 
-                       if( rcList.contains(colID) ) {
+                       if( _rcList.contains(colID) ) {
                                Long mvVal2 = 
meta.getRecodeMap(colID-1).get(mvVal);
                                if( mvVal2 == null) 
                                        throw new RuntimeException("Missing 
recode value for impute value '"+mvVal+"'.");
@@ -1013,4 +998,12 @@ public class MVImputeAgent extends Encoder
                        }
                }
        }
+       
+       /**
+        * 
+        * @param rcList
+        */
+       public void initRecodeIDList(List<Integer> rcList) {
+               _rcList = rcList;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
index 539ff30..186ac7a 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
@@ -94,6 +94,16 @@ public class OmitAgent extends Encoder
        }
 
        @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               return apply(in, out);
+       }
+       
+       @Override
+       public void build(FrameBlock in) {      
+               //do nothing
+       }
+       
+       @Override
        public String[] apply(String[] words) {
                return null;
        }
@@ -133,29 +143,14 @@ public class OmitAgent extends Encoder
        }
 
        @Override
-       public double[] encode(String[] in, double[] out) {
-               return null;
-       }
-
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               return apply(in, out);
-       }
-
-       @Override
-       public void build(String[] in) {
-               //do nothing
-       }
-
-       @Override
-       public void build(FrameBlock in) {      
+       public FrameBlock getMetaData(FrameBlock out) {
                //do nothing
+               return out;
        }
-
+       
        @Override
-       public FrameBlock getMetaData(FrameBlock out) {
+       public void initMetaData(FrameBlock meta) {
                //do nothing
-               return out;
        }
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index a44ad5a..01d7c85 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -88,19 +88,6 @@ public class RecodeAgent extends Encoder
                }
        }
        
-       /**
-        * Construct the recodemaps from the given input frame for all 
-        * columns registered for recode.
-        * 
-        * @param frame
-        */
-       public void initRecodeMaps( FrameBlock frame ) {
-               for( int j=0; j<_colList.length; j++ ) {
-                       int colID = _colList[j]; //1-based
-                       _rcdMaps.put(colID, frame.getRecodeMap(colID-1));
-               }
-       }
-       
        public HashMap<Integer, HashMap<String,Long>> getCPRecodeMaps() { 
                return _rcdMaps; 
        }
@@ -382,6 +369,56 @@ public class RecodeAgent extends Encoder
        }       
 
        /**
+        * 
+        * @param colID
+        * @param key
+        * @return
+        */
+       private String lookupRCDMap(int colID, String key) {
+               if( _finalMaps!=null )
+                       return _finalMaps.get(colID).get(key);
+               else { //used for cp
+                       Long tmp = _rcdMaps.get(colID).get(key);
+                       return (tmp!=null) ? Long.toString(tmp) : null;
+               }
+       }
+       
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               if( !isApplicable() )
+                       return out;
+               
+               //build and apply recode maps 
+               build(in);
+               apply(in, out);
+               
+               return out;
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               if( !isApplicable() )
+                       return;         
+               
+               Iterator<String[]> iter = in.getStringRowIterator();
+               while( iter.hasNext() ) {
+                       String[] row = iter.next(); 
+                       for( int j=0; j<_colList.length; j++ ) {
+                               int colID = _colList[j]; //1-based
+                               //allocate column map if necessary
+                               if( !_rcdMaps.containsKey(colID) ) 
+                                       _rcdMaps.put(colID, new 
HashMap<String,Long>());
+                               //probe and build column map
+                               HashMap<String,Long> map = _rcdMaps.get(colID);
+                               String key = row[colID-1];
+                               if( !map.containsKey(key) )
+                                       map.put(key, new Long(map.size()+1));
+                       }
+               }
+       }
+       
+       /**
         * Method to apply transformations.
         * 
         * @param words
@@ -422,81 +459,11 @@ public class RecodeAgent extends Encoder
                
                return out;
        }
-       
-       /**
-        * 
-        * @param colID
-        * @param key
-        * @return
-        */
-       private String lookupRCDMap(int colID, String key) {
-               if( _finalMaps!=null )
-                       return _finalMaps.get(colID).get(key);
-               else { //used for cp
-                       Long tmp = _rcdMaps.get(colID).get(key);
-                       return (tmp!=null) ? Long.toString(tmp) : null;
-               }
-       }
-
-       @Override
-       public double[] encode(String[] in, double[] out) {
-               if( !isApplicable() )
-                       return out;
-               
-               //build and apply recode maps
-               build(in);
-               apply(in);
-               
-               //convert to double 
-               for( int j=0; j<_colList.length; j++ )
-                       out[_colList[j]-1] = 
Double.parseDouble(in[_colList[j]-1]);             
-               return out;
-       }
-
-       @Override
-       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               if( !isApplicable() )
-                       return out;
-               
-               //build and apply recode maps 
-               build(in);
-               apply(in, out);
-               
-               return out;
-       }
-
-       @Override
-       public void build(String[] in) {
-               if( !isApplicable() )
-                       return;
-               
-               for( int j=0; j<_colList.length; j++ ) {
-                       int colID = _colList[j]; //1-based
-                       //allocate column map if necessary
-                       if( !_rcdMaps.containsKey(colID) ) 
-                               _rcdMaps.put(colID, new HashMap<String,Long>());
-                       //probe and build column map
-                       HashMap<String,Long> map = _rcdMaps.get(colID);
-                       String key = in[colID-1];
-                       if( !map.containsKey(key) )
-                               map.put(key, new Long(map.size()+1));
-               }
-       }
-
-       @Override
-       public void build(FrameBlock in) {
-               if( !isApplicable() )
-                       return;         
-               
-               Iterator<String[]> iter = in.getStringRowIterator();
-               while( iter.hasNext() )
-                       build( iter.next() );
-       }
 
        @Override
-       public FrameBlock getMetaData(FrameBlock out) {
+       public FrameBlock getMetaData(FrameBlock meta) {
                if( !isApplicable() )
-                       return out;
+                       return meta;
                
                //inverse operation to initRecodeMaps
                
@@ -505,7 +472,7 @@ public class RecodeAgent extends Encoder
                for( int j=0; j<_colList.length; j++ )
                        if( _rcdMaps.containsKey(_colList[j]) )
                                maxDistinct = Math.max(maxDistinct, 
_rcdMaps.get(_colList[j]).size());
-               out.ensureAllocatedColumns(maxDistinct);
+               meta.ensureAllocatedColumns(maxDistinct);
                
                //create compact meta data representation
                for( int j=0; j<_colList.length; j++ ) {
@@ -513,12 +480,41 @@ public class RecodeAgent extends Encoder
                        int rowID = 0;
                        if( _rcdMaps.containsKey(_colList[j]) )
                                for( Entry<String, Long> e : 
_rcdMaps.get(colID).entrySet() ) {
-                                       String tmp = e.getKey() + 
Lop.DATATYPE_PREFIX + e.getValue().toString();
-                                       out.set(rowID++, colID-1, tmp); 
+                                       String tmp = 
constructRecodeMapEntry(e.getKey(), e.getValue());
+                                       meta.set(rowID++, colID-1, tmp); 
                                }
+                       meta.getColumnMetadata(colID-1).setNumDistinct(
+                                       _rcdMaps.get(colID).size());
                }
                
-               return out;
+               return meta;
+       }
+       
+
+       /**
+        * Construct the recodemaps from the given input frame for all 
+        * columns registered for recode.
+        * 
+        * @param frame
+        */
+       public void initMetaData( FrameBlock meta ) {
+               if( meta == null || meta.getNumRows()<=0 )
+                       return;
+               
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       _rcdMaps.put(colID, meta.getRecodeMap(colID-1));
+               }
+       }
+       
+       /**
+        * 
+        * @param token
+        * @param code
+        * @return
+        */
+       public static String constructRecodeMapEntry(String token, Long code) {
+               return token + Lop.DATATYPE_PREFIX + code.toString();
        }
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
index f66ed5e..6f7c7ee 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/decode/Decoder.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.transform.decode;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.sysml.parser.Expression.ValueType;
@@ -30,25 +31,19 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
  * interface for decoding matrices to frames.
  * 
  */
-public abstract class Decoder 
+public abstract class Decoder implements Serializable
 {      
-       protected List<ValueType> _schema = null;
+       private static final long serialVersionUID = -1732411001366177787L;
        
-       protected Decoder( List<ValueType> schema ) {
+       protected List<ValueType> _schema = null;
+       protected int[] _colList = null;
+               
+       protected Decoder( List<ValueType> schema, int[] colList ) {
                _schema = schema;
+               _colList = colList;
        }
        
        /**
-        * Row decode API converting a matrix row into a frame row
-        * of the specified decoder schema.
-        * 
-        * @param in
-        * @param out
-        * @return
-        */
-       public abstract Object[] decode(double[] in, Object[] out);
-       
-       /**
         * Block decode API converting a matrix block into a frame block.
         * 
         * @param in
@@ -57,4 +52,10 @@ public abstract class Decoder
         * @return returns given output frame block for convenience
         */
        public abstract FrameBlock decode(MatrixBlock in, FrameBlock out);
+       
+       /**
+        * 
+        * @param meta
+        */
+       public abstract void initMetaData(FrameBlock meta);
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
index b0b0909..a8f9233 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderComposite.java
@@ -34,29 +34,30 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
  */
 public class DecoderComposite extends Decoder
 {
+       private static final long serialVersionUID = 5790600547144743716L;
+       
        private List<Decoder> _decoders = null;
        
        protected DecoderComposite(List<ValueType> schema, List<Decoder> 
decoders) {
-               super(schema);
+               super(schema, null);
                _decoders = decoders;
        }
        
        protected DecoderComposite(List<ValueType> schema, Decoder[] decoders) {
-               super(schema);
+               super(schema, null);
                _decoders = Arrays.asList(decoders);
        }
 
        @Override
-       public Object[] decode(double[] in, Object[] out) {
+       public FrameBlock decode(MatrixBlock in, FrameBlock out) {
                for( Decoder decoder : _decoders )
-                       decoder.decode(in, out);
+                       out = decoder.decode(in, out);  
                return out;
        }
-
+       
        @Override
-       public FrameBlock decode(MatrixBlock in, FrameBlock out) {
+       public void initMetaData(FrameBlock meta) {
                for( Decoder decoder : _decoders )
-                       decoder.decode(in, out);        
-               return out;
+                       decoder.initMetaData(meta);     
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java
new file mode 100644
index 0000000..2916742
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderDummycode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.decode;
+
+import java.util.List;
+
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.UtilFunctions;
+
+/**
+ * Simple atomic decoder for dummycoded columns. This decoder builds internally
+ * inverted column mappings from the given frame meta data. 
+ *  
+ */
+public class DecoderDummycode extends Decoder
+{
+       private static final long serialVersionUID = 4758831042891032129L;
+       
+       private int[] _clPos = null;
+       private int[] _cuPos = null;
+       
+       protected DecoderDummycode(List<ValueType> schema, int[] dcCols) {
+               //dcCols refers to column IDs in output (non-dc)
+               super(schema, dcCols);
+       }
+
+       @Override
+       public FrameBlock decode(MatrixBlock in, FrameBlock out) {
+               out.ensureAllocatedColumns(in.getNumRows());
+               for( int i=0; i<in.getNumRows(); i++ )
+                       for( int j=0; j<_colList.length; j++ )
+                               for( int k=_clPos[j]; k<_cuPos[j]; k++ )
+                                       if( in.quickGetValue(i, k-1) != 0 ) {
+                                               int col = _colList[j] - 1;
+                                               out.set(i, col, 
UtilFunctions.doubleToObject(
+                                                               
out.getSchema().get(col), k-_clPos[j]+1));
+                                       }               
+               return out;
+       }
+
+       @Override
+       public void initMetaData(FrameBlock meta) {
+               _clPos = new int[_colList.length]; //col lower pos 
+               _cuPos = new int[_colList.length]; //col upper pos 
+               for( int j=0, off=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j];
+                       int ndist = (int)meta.getColumnMetadata()
+                                       .get(colID-1).getNumDistinct();
+                       _clPos[j] = off + colID;
+                       _cuPos[j] = _clPos[j] + ndist;
+                       off += ndist - 1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
index d0e04f4..78f1ad2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
@@ -20,16 +20,18 @@
 package org.apache.sysml.runtime.transform.decode;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
-import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONObject;
 
 
@@ -43,6 +45,7 @@ public class DecoderFactory
         * @return
         * @throws DMLRuntimeException
         */
+       @SuppressWarnings("unchecked")
        public static Decoder createDecoder(String spec, List<ValueType> 
schema, FrameBlock meta) 
                throws DMLRuntimeException 
        {       
@@ -59,44 +62,34 @@ public class DecoderFactory
                                schema = 
Collections.nCopies(meta.getNumColumns(), ValueType.STRING);
                        }
                        
-                       //create decoders 'recode' and 'pass-through'
-                       if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE))  {
-                               JSONArray attrs = null;
-                               if( jSpec.get(TfUtils.TXMETHOD_RECODE) 
instanceof JSONObject ) {
-                                       JSONObject obj = (JSONObject) 
jSpec.get(TfUtils.TXMETHOD_RECODE);
-                                       attrs = (JSONArray) 
obj.get(TfUtils.JSON_ATTRS);
-                               }
-                               else
-                                       attrs = 
(JSONArray)jSpec.get(TfUtils.TXMETHOD_RECODE);
-                               
-                               //recode decoder
-                               int[] rcCols = new int[attrs.size()]; 
-                               for(int j=0; j<rcCols.length; j++) 
-                                       rcCols[j] = 
UtilFunctions.toInt(attrs.get(j))-1;
-                               ldecoders.add(new DecoderRecode(schema, meta, 
rcCols));
-                               
-                               //pass-through decode (non-recode columns)
-                               if( schema.size() > attrs.size() ) {
-                                       int[] ptCols = new 
int[schema.size()-attrs.size()]; 
-                                       HashSet<Integer> probe = new 
HashSet<Integer>();
-                                       for( int j=0; j<rcCols.length; j++ )
-                                               probe.add(rcCols[j]);
-                                       for( int j=0, pos=0; j<schema.size(); 
j++ )
-                                               if( !probe.contains(j) )
-                                                       ptCols[pos++] = j;
-                                       ldecoders.add(new 
DecoderPassThrough(schema, ptCols));  
-                               }
+                       //create decoders 'recode', 'dummy' and 'pass-through'
+                       List<Integer> rcIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_RECODE)));
+                       List<Integer> dcIDs = Arrays.asList(ArrayUtils.toObject(
+                                       TfMetaUtils.parseJsonIDList(jSpec, 
TfUtils.TXMETHOD_DUMMYCODE))); 
+                       rcIDs = new 
ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs));
+                       List<Integer> ptIDs = new 
ArrayList<Integer>(CollectionUtils
+                                       
.subtract(UtilFunctions.getSequenceList(1, schema.size(), 1), rcIDs)); 
+                       
+                       if( !dcIDs.isEmpty() ) {
+                               ldecoders.add(new DecoderDummycode(schema, 
+                                               
ArrayUtils.toPrimitive(dcIDs.toArray(new Integer[0]))));
+                       }
+                       if( !rcIDs.isEmpty() ) {
+                               ldecoders.add(new DecoderRecode(schema, 
!dcIDs.isEmpty(),
+                                               
ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0]))));
                        }
-                       //create full 'pass-through' decoder if necessary
-                       else {
-                               int[] ptCols = new int[schema.size()];
-                               for( int j=0; j<ptCols.length; j++ )
-                                       ptCols[j] = j;
-                               ldecoders.add(new DecoderPassThrough(schema, 
ptCols));
+                       if( !ptIDs.isEmpty() ) {
+                               ldecoders.add(new DecoderPassThrough(schema, 
+                                               
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])),
+                                               
ArrayUtils.toPrimitive(dcIDs.toArray(new Integer[0]))));        
                        }
                        
                        //create composite decoder of all created decoders
+                       //and initialize with given meta data (recode, dummy, 
bin)
                        decoder = new DecoderComposite(schema, ldecoders);
+                       if( meta != null )
+                               decoder.initMetaData(meta);
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java
 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java
index 0333504..d2bf7fa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderPassThrough.java
@@ -33,30 +33,52 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public class DecoderPassThrough extends Decoder
 {
-       private int[] _ptCols = null; //0-based
+       private static final long serialVersionUID = -8525203889417422598L;
        
-       protected DecoderPassThrough(List<ValueType> schema, int[] ptCols) {
-               super(schema);
-               _ptCols = ptCols;
-       }
-
-       @Override
-       public Object[] decode(double[] in, Object[] out) {
-               for( int j=0; j<_ptCols.length; j++ )
-                       out[_ptCols[j]] = in[_ptCols[j]];
-               return out;
+       private int[] _dcCols = null;
+       private int[] _srcCols = null;
+       
+       protected DecoderPassThrough(List<ValueType> schema, int[] ptCols, 
int[] dcCols) {
+               super(schema, ptCols);
+               _dcCols = dcCols;
        }
 
        @Override
        public FrameBlock decode(MatrixBlock in, FrameBlock out) {
                out.ensureAllocatedColumns(in.getNumRows());
                for( int i=0; i<in.getNumRows(); i++ ) {
-                       for( int j=0; j<_ptCols.length; j++ ) {
-                               double val = in.quickGetValue(i, _ptCols[j]);
-                               out.set(i, _ptCols[j], 
UtilFunctions.doubleToObject(
-                                               _schema.get(_ptCols[j]), val));
+                       for( int j=0; j<_colList.length; j++ ) {
+                               int srcColID = _srcCols[j];
+                               int tgtColID = _colList[j];
+                               double val = in.quickGetValue(i, srcColID-1);
+                               out.set(i, tgtColID-1, 
UtilFunctions.doubleToObject(
+                                               _schema.get(tgtColID-1), val));
                        }
                }
                return out;
        }
+       
+       @Override
+       public void initMetaData(FrameBlock meta) {
+               if( _dcCols.length > 0 ) {
+                       //prepare source column id mapping w/ dummy coding
+                       _srcCols = new int[_colList.length];
+                       int ix1 = 0, ix2 = 0, off = 0;
+                       while( ix1<_colList.length ) {
+                               if( ix2>=_dcCols.length || _colList[ix1] < 
_dcCols[ix2] ) {
+                                       _srcCols[ix1] = _colList[ix1] + off;
+                                       ix1 ++;
+                               }
+                               else { //_colList[ix1] > _dcCols[ix2]
+                                       off += (int)meta.getColumnMetadata()
+                                                       
.get(_dcCols[ix2]-1).getNumDistinct() - 1;
+                                       ix2 ++;
+                               }
+                       }
+               }
+               else {
+                       //prepare direct source column mapping
+                       _srcCols = _colList;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java 
b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
index f554f8b..5484ded 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderRecode.java
@@ -37,54 +37,63 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public class DecoderRecode extends Decoder
 {
-       private int[] _rcCols = null; //0-based
+       private static final long serialVersionUID = -3784249774608228805L;
+
        private HashMap<Long, Object>[] _rcMaps = null;
+       private boolean _onOut = false;
        
-       @SuppressWarnings("unchecked")
-       protected DecoderRecode(List<ValueType> schema, FrameBlock meta, int[] 
rcCols) {
-               super(schema);
-               
-               //initialize recode maps according to schema
-               _rcCols = rcCols;
-               _rcMaps = new HashMap[_rcCols.length];
-               for( int j=0; j<_rcCols.length; j++ ) {
-                       HashMap<Long, Object> map = new HashMap<Long, Object>();
-                       for( int i=0; i<meta.getNumRows(); i++ ) {
-                               if( meta.get(i, _rcCols[j])==null )
-                                       break; //reached end of recode map
-                               String[] tmp = meta.get(i, 
_rcCols[j]).toString().split(Lop.DATATYPE_PREFIX);                           
-                               Object obj = 
UtilFunctions.stringToObject(schema.get(_rcCols[j]), tmp[0]);
-                               map.put(Long.parseLong(tmp[1]), obj);           
                
-                       }
-                       _rcMaps[j] = map;
-               }
+       protected DecoderRecode(List<ValueType> schema, boolean onOut, int[] 
rcCols) {
+               super(schema, rcCols);
+               _onOut = onOut;
        }
 
        @Override
-       public Object[] decode(double[] in, Object[] out) {
-               for( int j=0; j<_rcCols.length; j++ ) {
-                       long key = UtilFunctions.toLong(in[_rcCols[j]]);
-                       out[_rcCols[j]] = _rcMaps[j].get(key);
+       public FrameBlock decode(MatrixBlock in, FrameBlock out) {
+               if( _onOut ) { //recode on output (after dummy)
+                       for( int i=0; i<in.getNumRows(); i++ ) {
+                               for( int j=0; j<_colList.length; j++ ) {
+                                       int colID = _colList[j];
+                                       double val = 
UtilFunctions.objectToDouble(
+                                                       
out.getSchema().get(colID-1), out.get(i, colID-1));
+                                       long key = UtilFunctions.toLong(val);
+                                       out.set(i, colID-1, 
_rcMaps[j].get(key));
+                               }
+                       }
+               }
+               else { //recode on input (no dummy)
+                       out.ensureAllocatedColumns(in.getNumRows());
+                       for( int i=0; i<in.getNumRows(); i++ ) {
+                               for( int j=0; j<_colList.length; j++ ) {
+                                       double val = in.quickGetValue(i, 
_colList[j]-1);
+                                       long key = UtilFunctions.toLong(val);
+                                       out.set(i, _colList[j]-1, 
_rcMaps[j].get(key));
+                               }
+                       }
                }
                return out;
        }
 
        @Override
-       public FrameBlock decode(MatrixBlock in, FrameBlock out) {
-               out.ensureAllocatedColumns(in.getNumRows());
-               for( int i=0; i<in.getNumRows(); i++ ) {
-                       for( int j=0; j<_rcCols.length; j++ ) {
-                               double val = in.quickGetValue(i, _rcCols[j]);
-                               long key = UtilFunctions.toLong(val);
-                               out.set(i, _rcCols[j], _rcMaps[j].get(key));
+       @SuppressWarnings("unchecked")
+       public void initMetaData(FrameBlock meta) {
+               //initialize recode maps according to schema
+               _rcMaps = new HashMap[_colList.length];
+               for( int j=0; j<_colList.length; j++ ) {
+                       HashMap<Long, Object> map = new HashMap<Long, Object>();
+                       for( int i=0; i<meta.getNumRows(); i++ ) {
+                               if( meta.get(i, _colList[j]-1)==null )
+                                       break; //reached end of recode map
+                               String[] tmp = meta.get(i, 
_colList[j]-1).toString().split(Lop.DATATYPE_PREFIX);                           
     
+                               Object obj = 
UtilFunctions.stringToObject(_schema.get(_colList[j]-1), tmp[0]);
+                               map.put(Long.parseLong(tmp[1]), obj);           
                
                        }
+                       _rcMaps[j] = map;
                }
-               return out;
        }
        
        /**
         * Parses a line of <token, ID, count> into <token, ID> pairs, where 
-        * quoted tokens (potentially including separators) are supportd.
+        * quoted tokens (potentially including separators) are supported.
         * 
         * @param entry
         * @param pair

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
index ac01357..b5f81d0 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -110,15 +110,6 @@ public abstract class Encoder implements Serializable
        }
        
        /**
-        * Row encode: build and apply (transform encode).
-        * 
-        * @param in
-        * @param out
-        * @return
-        */
-       public abstract double[] encode(String[] in, double[] out);
-       
-       /**
         * Block encode: build and apply (transform encode).
         * 
         * @param in
@@ -128,14 +119,6 @@ public abstract class Encoder implements Serializable
        public abstract MatrixBlock encode(FrameBlock in, MatrixBlock out);
 
        /**
-        * Build the transform meta data for given row input. This call modifies
-        * and keeps meta data as encoder state.
-        * 
-        * @param in
-        */
-       public abstract void build(String[] in);
-       
-       /**
         * Build the transform meta data for the given block input. This call 
modifies
         * and keeps meta data as encoder state.
         * 
@@ -144,30 +127,38 @@ public abstract class Encoder implements Serializable
        public abstract void build(FrameBlock in);
        
        /**
-        * Construct a frame block out of the transform meta data.
+        * Encode input data blockwise according to existing transform meta
+        * data (transform apply).
         * 
+        * @param in
+        * @param out
         * @return
         */
-       public abstract FrameBlock getMetaData(FrameBlock out);
+       public abstract MatrixBlock apply(FrameBlock in, MatrixBlock out);
        
        /**
         * Encode input data according to existing transform meta
         * data (transform apply).
+        * TODO remove once file-based transform removed
         * 
         * @param in
         * @return
         */
        public abstract String[] apply(String[] in);
-       
+
        /**
-        * Encode input data blockwise according to existing transform meta
-        * data (transform apply).
+        * Construct a frame block out of the transform meta data.
         * 
-        * @param in
-        * @param out
         * @return
         */
-       public abstract MatrixBlock apply(FrameBlock in, MatrixBlock out);
+       public abstract FrameBlock getMetaData(FrameBlock out);
+
+       /**
+        * Sets up the required meta data for a subsequent call to apply.
+        * 
+        * @param meta
+        */
+       public abstract void initMetaData(FrameBlock meta);
        
        
        //OLD API: kept for a transition phase only

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index 1c27350..bafa655 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.transform.DistinctValue;
@@ -45,6 +46,7 @@ public class EncoderComposite extends Encoder
        private static final long serialVersionUID = -8473768154646831882L;
        
        private List<Encoder> _encoders = null;
+       private FrameBlock _meta = null;
        
        protected EncoderComposite(List<Encoder> encoders) {
                super(null, -1);
@@ -63,25 +65,29 @@ public class EncoderComposite extends Encoder
                        clen = Math.max(clen, encoder.getNumCols());
                return clen;
        }
-       
-       @Override
-       public double[] encode(String[] in, double[] out) {
-               for( Encoder encoder : _encoders )
-                       out = encoder.encode(in, out);
-               return out;
-       }
 
+       public List<Encoder> getEncoders() {
+               return _encoders;
+       }
+       
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               //build meta data first (for all encoders)
                for( Encoder encoder : _encoders )
-                       out = encoder.encode(in, out);
-               return out;
-       }
-
-       @Override
-       public void build(String[] in) {
+                       encoder.build(in);
+               
+               //propagate meta data 
+               _meta = new FrameBlock(in.getNumColumns(), ValueType.STRING);
+               for( Encoder encoder : _encoders ) {
+                       encoder.initMetaData(_meta);
+                       _meta = encoder.getMetaData(_meta);
+               }
+               
+               //apply meta data
                for( Encoder encoder : _encoders )
-                       encoder.build(in);              
+                       out = encoder.apply(in, out);
+                       
+               return out;
        }
 
        @Override
@@ -90,13 +96,7 @@ public class EncoderComposite extends Encoder
                        encoder.build(in);
        }
 
-       @Override
-       public FrameBlock getMetaData(FrameBlock out) {
-               for( Encoder encoder : _encoders )
-                       encoder.getMetaData(out);
-               return out;
-       }
-       
+
        @Override
        public String[] apply(String[] in) {
                for( Encoder encoder : _encoders )
@@ -110,6 +110,21 @@ public class EncoderComposite extends Encoder
                        out = encoder.apply(in, out);
                return out;
        }
+       
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               if( _meta != null )
+                       return _meta;
+               for( Encoder encoder : _encoders )
+                       encoder.getMetaData(out);
+               return out;
+       }
+       
+       @Override
+       public void initMetaData(FrameBlock out) {
+               for( Encoder encoder : _encoders )
+                       encoder.initMetaData(out);
+       }
 
        @Override
        public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index c3959ca..ae98bae 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -105,38 +105,28 @@ public class EncoderFactory
                        if( !rcIDs.isEmpty() ) {
                                RecodeAgent ra = new RecodeAgent(jSpec, clen);
                                
ra.setColList(ArrayUtils.toPrimitive(rcIDs.toArray(new Integer[0])));
-                               if( meta != null )
-                                       ra.initRecodeMaps(meta);
                                lencoders.add(ra);      
                        }
-                       if( !ptIDs.isEmpty() ) {
+                       if( !ptIDs.isEmpty() )
                                lencoders.add(new EncoderPassThrough(
                                                
ArrayUtils.toPrimitive(ptIDs.toArray(new Integer[0])), clen));  
-                       }
-                       if( !dcIDs.isEmpty() ) {
-                               DummycodeAgent da = new DummycodeAgent(jSpec, 
schema.size());
-                               if( meta != null )
-                                       da.initDomainSizes(meta);
-                               lencoders.add(da);
-                       }
-                       if( !binIDs.isEmpty() ) {
-                               BinAgent ba = new BinAgent(jSpec, 
schema.size(), true);
-                               if( meta != null )
-                                       ba.initBins(meta);
-                               lencoders.add(ba);
-                       }
-                       if( !oIDs.isEmpty() ) {
+                       if( !dcIDs.isEmpty() )
+                               lencoders.add(new DummycodeAgent(jSpec, 
schema.size()));
+                       if( !binIDs.isEmpty() )
+                               lencoders.add(new BinAgent(jSpec, 
schema.size(), true));
+                       if( !oIDs.isEmpty() )
                                lencoders.add(new OmitAgent(jSpec, 
schema.size()));
-                       }
                        if( !mvIDs.isEmpty() ) {
                                MVImputeAgent ma = new MVImputeAgent(jSpec, 
schema.size());
-                               if( meta != null )
-                                       ma.initReplacementList(meta, rcIDs);
+                               ma.initRecodeIDList(rcIDs);
                                lencoders.add(ma);
                        }
                        
-                       //create composite decoder of all created decoders
+                       //create composite decoder of all created encoders
+                       //and initialize meta data (recode, dummy, bin, mv)
                        encoder = new EncoderComposite(lencoders);
+                       if( meta != null )
+                               encoder.initMetaData(meta);
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
index 445d019..ab146ce 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderPassThrough.java
@@ -49,37 +49,16 @@ public class EncoderPassThrough extends Encoder
        }
 
        @Override
-       public double[] encode(String[] in, double[] out) {
-               for( int j=0; j<_colList.length; j++ ) {
-                       String tmp = in[_colList[j]-1];
-                       out[_colList[j]-1] = (tmp==null) ? 0 : 
-                               Double.parseDouble(tmp);
-               }
-               
-               return out;
-       }
-
-       @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
                return apply(in, out);
        }
 
        @Override
-       public void build(String[] in) {
-               //do nothing    
-       }
-
-       @Override
        public void build(FrameBlock in) {
                //do nothing
        }
 
        @Override
-       public FrameBlock getMetaData(FrameBlock out) {
-               return null;
-       }
-       
-       @Override
        public String[] apply(String[] in) {
                return in;
        }
@@ -101,6 +80,18 @@ public class EncoderPassThrough extends Encoder
        }
 
        @Override
+       public FrameBlock getMetaData(FrameBlock meta) {
+               //do nothing
+               return meta;
+       }
+       
+       @Override
+       public void initMetaData(FrameBlock meta) {
+               //do nothing
+       }
+       
+       
+       @Override
        public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
                throw new RuntimeException("File-based api not supported.");
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/293c81c6/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
index b9feb5d..82396f6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/meta/TfMetaUtils.java
@@ -132,7 +132,10 @@ public class TfMetaUtils
                        for(int j=0; j<colspecs.size(); j++) {
                                JSONObject colspec = (JSONObject) 
colspecs.get(j);
                                colList[j] = colspec.getInt("id");
-                       }       
+                       }
+                       
+                       //ensure ascending order of column IDs
+                       Arrays.sort(colList);
                }
                
                return colList;


Reply via email to