[SYSTEMML-569] Extended spark transformencode on frames (mv, omit, bin)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/172bfcac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/172bfcac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/172bfcac

Branch: refs/heads/master
Commit: 172bfcacc0b260c49c18c9e26d2dfc81f7e3051e
Parents: 12f2da9
Author: Matthias Boehm <[email protected]>
Authored: Thu Jul 7 21:52:31 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jul 8 10:59:18 2016 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/parser/DataExpression.java |   1 +
 ...ReturnParameterizedBuiltinSPInstruction.java | 156 +++++++++++++++-
 .../sysml/runtime/matrix/data/FrameBlock.java   |   5 +-
 .../sysml/runtime/transform/BinAgent.java       |   3 +-
 .../sysml/runtime/transform/MVImputeAgent.java  | 184 +++++++++++++------
 .../sysml/runtime/transform/RecodeAgent.java    |   2 +-
 .../transform/encode/EncoderComposite.java      |   6 +-
 .../sysml/runtime/util/UtilFunctions.java       |   2 +-
 .../TransformFrameEncodeApplyTest.java          | 178 ++++++++++++++++++
 .../transform/TransformFrameEncodeApply.dml     |  34 ++++
 .../functions/transform/ZPackageSuite.java      |   1 +
 11 files changed, 498 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/parser/DataExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java 
b/src/main/java/org/apache/sysml/parser/DataExpression.java
index c33f965..96089f0 100644
--- a/src/main/java/org/apache/sysml/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysml/parser/DataExpression.java
@@ -763,6 +763,7 @@ public class DataExpression extends DataIdentifier
                        // if the MTD file exists, check the values specified 
in read statement match values in metadata MTD file
                        if (configObject != null){
                                parseMetaDataFileParameters(mtdFileName, 
configObject, conditional);
+                               inferredFormatType = true;
                        }
                        else {
                                LOG.warn("Metadata file: " + new 
Path(mtdFileName) + " not provided");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index fc9e9ce..a53f673 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import org.apache.spark.Accumulator;
 import org.apache.spark.AccumulatorParam;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -38,8 +39,10 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyFunction;
 import 
org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction.RDDTransformApplyOffsetFunction;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
@@ -48,10 +51,13 @@ import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.FrameBlock.ColumnMetadata;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.transform.MVImputeAgent;
+import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.RecodeAgent;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.transform.encode.EncoderComposite;
@@ -126,12 +132,19 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                        fo.getSchema(), 
(int)fo.getNumColumns(), null);
                        
                        Accumulator<Long> accMax = 
sec.getSparkContext().accumulator(0L, new MaxAcc()); 
-                       in.mapPartitionsToPair(new 
TransformEncodeBuildFunction(encoderBuild))
-                         .distinct().groupByKey()
-                         .flatMap(new TransformEncodeGroupFunction(accMax))
-                         .saveAsTextFile(fometa.getFileName()); //trigger eval
+                       JavaRDD<String> rcMaps = in
+                                       .mapPartitionsToPair(new 
TransformEncodeBuildFunction(encoderBuild))
+                                       .distinct().groupByKey()
+                                       .flatMap(new 
TransformEncodeGroupFunction(accMax));
+                       if( containsMVImputeEncoder(encoderBuild) ) {
+                               MVImputeAgent mva = 
getMVImputeEncoder(encoderBuild);
+                               rcMaps = rcMaps.union(
+                                               in.mapPartitionsToPair(new 
TransformEncodeBuild2Function(mva))
+                                                 .groupByKey().flatMap(new 
TransformEncodeGroup2Function(mva)) );
+                       }
+                       rcMaps.saveAsTextFile(fometa.getFileName()); //trigger 
eval
                        
-                       //reuse multi-threaded reader 
+                       //consolidate meta data frame (reuse multi-threaded 
reader, special handling missing values) 
                        FrameReader reader = 
FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
                        FrameBlock meta = 
reader.readFrameFromHDFS(fometa.getFileName(), accMax.value(), 
fo.getNumColumns());
                        meta.recomputeColumnCardinality(); //recompute num 
distinct items per column
@@ -169,6 +182,32 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
        
        /**
         * 
+        * @param encoder
+        * @return
+        */
+       private boolean containsMVImputeEncoder(Encoder encoder) {
+               if( encoder instanceof EncoderComposite )
+                       for( Encoder cencoder : 
((EncoderComposite)encoder).getEncoders() )
+                               if( cencoder instanceof MVImputeAgent )
+                                       return true;
+               return false;   
+       }
+       
+       /**
+        * 
+        * @param encoder
+        * @return
+        */
+       private MVImputeAgent getMVImputeEncoder(Encoder encoder) {
+               if( encoder instanceof EncoderComposite )
+                       for( Encoder cencoder : 
((EncoderComposite)encoder).getEncoders() )
+                               if( cencoder instanceof MVImputeAgent )
+                                       return (MVImputeAgent) cencoder;
+               return null;    
+       }
+       
+       /**
+        * 
         */
        public static class TransformEncodeBuildFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String>
        {
@@ -266,4 +305,111 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        return Math.max(arg0, arg1);    
                }
        }
+       
+       /**
+        * 
+        */
+       public static class TransformEncodeBuild2Function implements 
PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, ColumnMetadata>
+       {
+               private static final long serialVersionUID = 
6336375833412029279L;
+
+               private MVImputeAgent _encoder = null;
+               
+               public TransformEncodeBuild2Function(MVImputeAgent encoder) {
+                       _encoder = encoder;
+               }
+               
+               @Override
+               public Iterable<Tuple2<Integer, ColumnMetadata>> 
call(Iterator<Tuple2<Long, FrameBlock>> iter)
+                       throws Exception 
+               {
+                       //build meta data (e.g., histograms and means)
+                       while( iter.hasNext() ) {
+                               FrameBlock block = iter.next()._2();
+                               _encoder.build(block);  
+                       }
+                       
+                       //extract meta data
+                       ArrayList<Tuple2<Integer,ColumnMetadata>> ret = new 
ArrayList<Tuple2<Integer,ColumnMetadata>>();
+                       int[] collist = _encoder.getColList();
+                       for( int j=0; j<collist.length; j++ ) {
+                               if( _encoder.getMethod(collist[j]) == 
MVMethod.GLOBAL_MODE ) {
+                                       HashMap<String,Long> hist = 
_encoder.getHistogram(collist[j]);
+                                       for( Entry<String,Long> e : 
hist.entrySet() )
+                                               ret.add(new 
Tuple2<Integer,ColumnMetadata>(collist[j], 
+                                                               new 
ColumnMetadata(e.getValue(), e.getKey())));
+                               }
+                               else if( _encoder.getMethod(collist[j]) == 
MVMethod.GLOBAL_MEAN ) {
+                                       ret.add(new 
Tuple2<Integer,ColumnMetadata>(collist[j], 
+                                                       new 
ColumnMetadata(_encoder.getNonMVCount(collist[j]), 
String.valueOf(_encoder.getMeans()[j]._sum))));
+                               }
+                               else if( _encoder.getMethod(collist[j]) == 
MVMethod.CONSTANT ) {
+                                       ret.add(new 
Tuple2<Integer,ColumnMetadata>(collist[j],
+                                                       new ColumnMetadata(0, 
_encoder.getReplacement(collist[j]))));
+                               }
+                       }
+                       
+                       return ret;
+               }
+       }
+       
+       /**
+        * 
+        */
+       public static class TransformEncodeGroup2Function implements 
FlatMapFunction<Tuple2<Integer, Iterable<ColumnMetadata>>, String>
+       {
+               private static final long serialVersionUID = 
702100641492347459L;
+               
+               private MVImputeAgent _encoder = null;
+               
+               public TransformEncodeGroup2Function(MVImputeAgent encoder) {   
+                       _encoder = encoder;
+               }
+
+               @Override
+               public Iterable<String> call(Tuple2<Integer, 
Iterable<ColumnMetadata>> arg0)
+                               throws Exception 
+               {
+                       int colix = arg0._1();
+                       Iterator<ColumnMetadata> iter = arg0._2().iterator();
+                       ArrayList<String> ret = new ArrayList<String>();
+                       
+                       //compute global mode of categorical feature, i.e., 
value with highest frequency
+                       if( _encoder.getMethod(colix) == MVMethod.GLOBAL_MODE ) 
{
+                               HashMap<String, Long> hist = new 
HashMap<String,Long>();
+                               while( iter.hasNext() ) {
+                                       ColumnMetadata cmeta = iter.next(); 
+                                       Long tmp = hist.get(cmeta.getMvValue());
+                                       hist.put(cmeta.getMvValue(), 
cmeta.getNumDistinct() + ((tmp!=null)?tmp:0));
+                               }
+                               long max = Long.MIN_VALUE; String mode = null;
+                               for( Entry<String, Long> e : hist.entrySet() ) 
+                                       if( e.getValue() > max  ) {
+                                               mode = e.getKey();
+                                               max = e.getValue();
+                                       }
+                               ret.add("-2 " + colix + " " + mode);
+                       }
+                       //compute global mean of categorical feature
+                       else if( _encoder.getMethod(colix) == 
MVMethod.GLOBAL_MEAN ) {
+                               KahanObject kbuff = new KahanObject(0, 0);
+                               KahanPlus kplus = 
KahanPlus.getKahanPlusFnObject();
+                               int count = 0;
+                               while( iter.hasNext() ) {
+                                       ColumnMetadata cmeta = iter.next(); 
+                                       kplus.execute2(kbuff, 
Double.parseDouble(cmeta.getMvValue()));
+                                       count += cmeta.getNumDistinct();
+                               }
+                               if( count > 0 )
+                                       ret.add("-2 " + colix + " " + 
String.valueOf(kbuff._sum/count));
+                       }
+                       //pass-through constant label
+                       else if( _encoder.getMethod(colix) == MVMethod.CONSTANT 
) {
+                               if( iter.hasNext() )
+                                       ret.add("-2 " + colix + " " + 
iter.next().getMvValue());
+                       }
+                       
+                       return ret;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 2088e85..051ce58 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -25,6 +25,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1256,7 +1257,9 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        /**
         * 
         */
-       public static class ColumnMetadata {
+       public static class ColumnMetadata implements Serializable {
+               private static final long serialVersionUID = 
-90094082422100311L;
+               
                private long _ndistinct = 0;
                private String _mvValue = null;
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index fe83627..ad7cbfc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -375,8 +375,7 @@ public class BinAgent extends Encoder
 
        @Override
        public FrameBlock getMetaData(FrameBlock meta) {
-               // TODO Auto-generated method stub
-               return null;
+               return meta;
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index 344693c..68896ac 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -30,6 +30,7 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -67,15 +68,8 @@ public class MVImputeAgent extends Encoder
        
        public enum MVMethod { INVALID, GLOBAL_MEAN, GLOBAL_MODE, CONSTANT };
        
-       /* 
-        * Imputation Methods:
-        * 1 - global_mean
-        * 2 - global_mode
-        * 3 - constant
-        * 
-        */
-       private byte[] _mvMethodList = null;
-       private byte[] _mvscMethodList = null;  // scaling methods for 
attributes that are imputed and also scaled
+       private MVMethod[] _mvMethodList = null;
+       private MVMethod[] _mvscMethodList = null;      // scaling methods for 
attributes that are imputed and also scaled
        
        private BitSet _isMVScaled = null;
        private CM _varFn = CM.getCMFnObject(AggregateOperationTypes.VARIANCE); 
        // function object that understands variance computation
@@ -86,10 +80,9 @@ public class MVImputeAgent extends Encoder
        private long[] _countList = null;                               // #of 
non-missing values
        
        private CM_COV_Object[] _varList = null;                // column-level 
variances, computed so far (for scaling)
-       
 
        private int[]                   _scnomvList = null;                     
// List of attributes that are scaled but not imputed
-       private byte[]                  _scnomvMethodList = null;       // 
scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
+       private MVMethod[]              _scnomvMethodList = null;       // 
scaling methods: 0 for invalid; 1 for mean-subtraction; 2 for z-scoring
        private KahanObject[]   _scnomvMeanList = null;         // column-level 
means, for attributes scaled but not imputed
        private long[]                  _scnomvCountList = null;        // #of 
non-missing values, for attributes scaled but not imputed
        private CM_COV_Object[] _scnomvVarList = null;          // column-level 
variances, computed so far
@@ -97,6 +90,7 @@ public class MVImputeAgent extends Encoder
        private String[] _replacementList = null;               // 
replacements: for global_mean, mean; and for global_mode, recode id of mode 
category
        private String[] _NAstrings = null;
        private List<Integer> _rcList = null; 
+       private HashMap<Integer,HashMap<String,Long>> _hist = null;
        
        public String[] getReplacements() { return _replacementList; }
        public KahanObject[] getMeans()   { return _meanList; }
@@ -108,9 +102,16 @@ public class MVImputeAgent extends Encoder
                throws JSONException
        {
                super(null, clen);
+               
+               //handle column list
                int[] collist = TfMetaUtils.parseJsonObjectIDList(parsedSpec, 
TfUtils.TXMETHOD_IMPUTE);
                initColList(collist);
        
+               //handle method list
+               parseMethodsAndReplacments(parsedSpec);
+               
+               //create reuse histograms
+               _hist = new HashMap<Integer, HashMap<String,Long>>();
        }
                        
        public MVImputeAgent(JSONObject parsedSpec, String[] NAstrings, int 
clen)
@@ -136,7 +137,7 @@ public class MVImputeAgent extends Encoder
                        int mvLength = mvattrs.size();
                        
                        _colList = new int[mvLength];
-                       _mvMethodList = new byte[mvLength];
+                       _mvMethodList = new MVMethod[mvLength];
                        
                        _meanList = new KahanObject[mvLength];
                        _countList = new long[mvLength];
@@ -147,7 +148,7 @@ public class MVImputeAgent extends Encoder
                        
                        for(int i=0; i < _colList.length; i++) {
                                _colList[i] = 
UtilFunctions.toInt(mvattrs.get(i));
-                               _mvMethodList[i] = (byte) 
UtilFunctions.toInt(mvmthds.get(i)); 
+                               _mvMethodList[i] = 
MVMethod.values()[UtilFunctions.toInt(mvmthds.get(i))]; 
                                _meanList[i] = new KahanObject(0, 0);
                        }
                        
@@ -173,7 +174,7 @@ public class MVImputeAgent extends Encoder
                else
                {
                        if ( _colList != null ) 
-                               _mvscMethodList = new byte[_colList.length];
+                               _mvscMethodList = new MVMethod[_colList.length];
                        
                        JSONObject scobj = (JSONObject) 
parsedSpec.get(TfUtils.TXMETHOD_SCALE);
                        JSONArray scattrs = (JSONArray) 
scobj.get(TfUtils.JSON_ATTRS);
@@ -195,7 +196,7 @@ public class MVImputeAgent extends Encoder
                                if(mvidx != -1)
                                {
                                        _isMVScaled.set(mvidx);
-                                       _mvscMethodList[mvidx] = mthd;
+                                       _mvscMethodList[mvidx] = 
MVMethod.values()[mthd];
                                        _varList[mvidx] = new CM_COV_Object();
                                }
                                else
@@ -205,7 +206,7 @@ public class MVImputeAgent extends Encoder
                        if(scnomv > 0)
                        {
                                _scnomvList = new int[scnomv];                  
-                               _scnomvMethodList = new byte[scnomv];   
+                               _scnomvMethodList = new MVMethod[scnomv];       
        
                                _scnomvMeanList = new KahanObject[scnomv];
                                _scnomvCountList = new long[scnomv];
@@ -219,7 +220,7 @@ public class MVImputeAgent extends Encoder
                                        if(isApplicable(colID) == -1)
                                        {       // scaled but not imputed
                                                _scnomvList[idx] = colID;
-                                               _scnomvMethodList[idx] = mthd;
+                                               _scnomvMethodList[idx] = 
MVMethod.values()[mthd];
                                                _scnomvMeanList[idx] = new 
KahanObject(0, 0);
                                                _scnomvVarList[idx] = new 
CM_COV_Object();
                                                idx++;
@@ -229,6 +230,28 @@ public class MVImputeAgent extends Encoder
                }
        }
        
+       /**
+        * 
+        * @param parsedSpec
+        * @throws JSONException
+        */
+       private void parseMethodsAndReplacments(JSONObject parsedSpec) throws 
JSONException {
+               JSONArray mvspec = (JSONArray) 
parsedSpec.get(TfUtils.TXMETHOD_IMPUTE);
+               _mvMethodList = new MVMethod[mvspec.size()];
+               _replacementList = new String[mvspec.size()];
+               _meanList = new KahanObject[mvspec.size()];
+               _countList = new long[mvspec.size()];
+               for(int i=0; i < mvspec.size(); i++) {
+                       JSONObject mvobj = (JSONObject)mvspec.get(i);
+                       _mvMethodList[i] = 
MVMethod.valueOf(mvobj.get("method").toString().toUpperCase()); 
+                       if( _mvMethodList[i] == MVMethod.CONSTANT ) {
+                               _replacementList[i] = 
mvobj.getString("value").toString();
+                       }
+                       _meanList[i] = new KahanObject(0, 0);
+               }
+       }
+       
+       
        public void prepare(String[] words) throws IOException {
                
                try {
@@ -242,13 +265,13 @@ public class MVImputeAgent extends Encoder
                                if(!TfUtils.isNA(_NAstrings, w)) {
                                        _countList[i]++;
                                        
-                                       boolean computeMean = (_mvMethodList[i] 
== 1 || _isMVScaled.get(i) );
+                                       boolean computeMean = (_mvMethodList[i] 
== MVMethod.GLOBAL_MEAN || _isMVScaled.get(i) );
                                        if(computeMean) {
                                                // global_mean
                                                double d = 
UtilFunctions.parseToDouble(w);
                                                _meanFn.execute2(_meanList[i], 
d, _countList[i]);
                                                
-                                               if (_isMVScaled.get(i) && 
_mvscMethodList[i] == 2)
+                                               if (_isMVScaled.get(i) && 
_mvscMethodList[i] == MVMethod.GLOBAL_MODE)
                                                        
_varFn.execute(_varList[i], d);
                                        }
                                        else {
@@ -271,7 +294,7 @@ public class MVImputeAgent extends Encoder
                                double d = UtilFunctions.parseToDouble(w);
                                _scnomvCountList[i]++;          // not 
required, this is always equal to total #records processed
                                _meanFn.execute2(_scnomvMeanList[i], d, 
_scnomvCountList[i]);
-                               if(_scnomvMethodList[i] == 2)
+                               if(_scnomvMethodList[i] == MVMethod.GLOBAL_MODE)
                                        _varFn.execute(_scnomvVarList[i], d);
                        }
                } catch(Exception e) {
@@ -311,15 +334,15 @@ public class MVImputeAgent extends Encoder
        
        private DistinctValue prepMeanOutput(int taskID, int idx, StringBuilder 
sb, boolean scnomv) throws CharacterCodingException {
                
-               byte mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
+               MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
                
-               if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+               if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || 
_isMVScaled.get(idx) ) {
                        String suffix = null;
                        if(scnomv)
                                suffix = "scnomv";
-                       else if ( mthd ==1 && _isMVScaled.get(idx) )
+                       else if ( mthd == MVMethod.GLOBAL_MEAN && 
_isMVScaled.get(idx) )
                                suffix = "scmv";        // both scaled and mv 
imputed
-                       else if ( mthd == 1 )
+                       else if ( mthd == MVMethod.GLOBAL_MEAN )
                                suffix = "noscmv";
                        else
                                suffix = "scnomv";
@@ -341,8 +364,8 @@ public class MVImputeAgent extends Encoder
        }
        
        private DistinctValue prepMeanCorrectionOutput(int taskID, int idx, 
StringBuilder sb, boolean scnomv) throws CharacterCodingException {
-               byte mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
-               if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+               MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
+               if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || 
_isMVScaled.get(idx) ) {
                        sb.setLength(0);
                        //CORRECTION_PREFIX + "_" + taskID + "_" + 
Double.toString(mean._correction);
                        sb.append(CORRECTION_PREFIX);
@@ -357,8 +380,8 @@ public class MVImputeAgent extends Encoder
        }
        
        private DistinctValue prepMeanCountOutput(int taskID, int idx, 
StringBuilder sb, boolean scnomv) throws CharacterCodingException {
-               byte mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
-               if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+               MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
+               if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || 
_isMVScaled.get(idx) ) {
                        sb.setLength(0);
                        //s = COUNT_PREFIX + "_" + taskID + "_" + 
Long.toString(count);
                        sb.append(COUNT_PREFIX);
@@ -373,8 +396,8 @@ public class MVImputeAgent extends Encoder
        }
        
        private DistinctValue prepTotalCountOutput(int taskID, int idx, 
StringBuilder sb, boolean scnomv, TfUtils agents) throws 
CharacterCodingException {
-               byte mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
-               if ( scnomv || mthd == 1 || _isMVScaled.get(idx) ) {
+               MVMethod mthd = (scnomv ? _scnomvMethodList[idx] : 
_mvMethodList[idx]);
+               if ( scnomv || mthd == MVMethod.GLOBAL_MEAN || 
_isMVScaled.get(idx) ) {
                        sb.setLength(0);
                        //TOTAL_COUNT_PREFIX + "_" + taskID + "_" + 
Long.toString(TransformationAgent._numValidRecords);
                        sb.append(TOTAL_COUNT_PREFIX);
@@ -390,8 +413,8 @@ public class MVImputeAgent extends Encoder
        private DistinctValue prepConstantOutput(int idx, StringBuilder sb) 
throws CharacterCodingException {
                if ( _mvMethodList == null )
                        return null;
-               byte mthd = _mvMethodList[idx];
-               if ( mthd == 3 ) {
+               MVMethod mthd = _mvMethodList[idx];
+               if ( mthd == MVMethod.CONSTANT ) {
                        sb.setLength(0);
                        sb.append(CONSTANT_PREFIX);
                        sb.append("_");
@@ -402,7 +425,7 @@ public class MVImputeAgent extends Encoder
        }
        
        private DistinctValue prepVarOutput(int taskID, int idx, StringBuilder 
sb, boolean scnomv) throws CharacterCodingException {
-               if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == 
2 ) {
+               if ( scnomv || _isMVScaled.get(idx) && _mvscMethodList[idx] == 
MVMethod.GLOBAL_MODE ) {
                        sb.setLength(0);
                        sb.append(VARIANCE_PREFIX);
                        sb.append("_");
@@ -560,7 +583,7 @@ public class MVImputeAgent extends Encoder
                                        
                                        double imputedValue = Double.NaN;
                                        KahanObject gmean = null;
-                                       if ( _mvMethodList[i] == 1 ) 
+                                       if ( _mvMethodList[i] == 
MVMethod.GLOBAL_MEAN ) 
                                        {
                                                gmean = _meanList[i];
                                                imputedValue = 
_meanList[i]._sum;
@@ -568,7 +591,7 @@ public class MVImputeAgent extends Encoder
                                                double mean = ( _countList[i] 
== 0 ? 0.0 : _meanList[i]._sum); 
                                                writeTfMtd(colID, 
Double.toString(mean), outputDir, fs, agents);
                                        }
-                                       else if ( _mvMethodList[i] == 3 ) 
+                                       else if ( _mvMethodList[i] == 
MVMethod.CONSTANT ) 
                                        {
                                                writeTfMtd(colID, 
_replacementList[i], outputDir, fs, agents);
                                                
@@ -584,7 +607,7 @@ public class MVImputeAgent extends Encoder
                                        if ( _isMVScaled.get(i) ) 
                                        {
                                                double sdev = -1.0;
-                                               if ( _mvscMethodList[i] == 2 ) {
+                                               if ( _mvscMethodList[i] == 
MVMethod.GLOBAL_MODE ) {
                                                        // Adjust variance with 
missing values
                                                        long totalMissingCount 
= (agents.getValid() - _countList[i]);
                                                        
_varFn.execute(_varList[i], imputedValue, totalMissingCount);
@@ -601,7 +624,7 @@ public class MVImputeAgent extends Encoder
                                        int colID = _scnomvList[i];
                                        double mean = (_scnomvCountList[i] == 0 
? 0.0 : _scnomvMeanList[i]._sum);
                                        double sdev = -1.0;
-                                       if ( _scnomvMethodList[i] == 2 ) 
+                                       if ( _scnomvMethodList[i] == 
MVMethod.GLOBAL_MODE ) 
                                        {
                                                double var = 
_scnomvVarList[i].getRequiredResult(new CMOperator(_varFn, 
AggregateOperationTypes.VARIANCE));
                                                sdev = Math.sqrt(var);
@@ -788,7 +811,7 @@ public class MVImputeAgent extends Encoder
                                        // since missing values themselves are 
replaced with gmean.
                                        long totalMissingCount = 
(totalRecordCount-totalValidCount);
                                        int idx = isApplicable(colID);
-                                       if(idx != -1 && _mvMethodList[idx] == 
3) 
+                                       if(idx != -1 && _mvMethodList[idx] == 
MVMethod.CONSTANT) 
                                                _meanFn.execute(gmean, 
UtilFunctions.parseToDouble(_replacementList[idx]), totalRecordCount);
                                        _varFn.execute(gcm, gmean._sum, 
totalMissingCount);
                                }
@@ -863,10 +886,10 @@ public class MVImputeAgent extends Encoder
                                for(int i=0; i<_colList.length;i++) {
                                        int colID = _colList[i];
                                        
-                                       if ( _mvMethodList[i] == 1 || 
_mvMethodList[i] == 2 )
+                                       if ( _mvMethodList[i] == 
MVMethod.GLOBAL_MEAN || _mvMethodList[i] == MVMethod.GLOBAL_MODE )
                                                // global_mean or global_mode
                                                _replacementList[i] = 
readReplacement(colID, fs, tfMtdDir, agents);
-                                       else if ( _mvMethodList[i] == 3 ) {
+                                       else if ( _mvMethodList[i] == 
MVMethod.CONSTANT ) {
                                                // constant: replace a missing 
value by a given constant
                                                // nothing to do. The constant 
values are loaded already during configure 
                                        }
@@ -894,15 +917,8 @@ public class MVImputeAgent extends Encoder
                int idx = isApplicable(colID);          
                if(idx == -1)
                        return MVMethod.INVALID;
-               
-               switch(_mvMethodList[idx])
-               {
-                       case 1: return MVMethod.GLOBAL_MEAN;
-                       case 2: return MVMethod.GLOBAL_MODE;
-                       case 3: return MVMethod.CONSTANT;
-                       default: return MVMethod.INVALID;
-               }
-               
+               else
+                       return _mvMethodList[idx];
        }
        
        public long getNonMVCount(int colID) {
@@ -917,14 +933,48 @@ public class MVImputeAgent extends Encoder
        
        @Override
        public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
-               // TODO Auto-generated method stub
-               return null;
+               build(in);
+               return apply(in, out);
        }
        
        @Override
        public void build(FrameBlock in) {
-               // TODO Auto-generated method stub
-               
+               try {
+                       for( int j=0; j<_colList.length; j++ ) {
+                               int colID = _colList[j];
+                               if( _mvMethodList[j] == MVMethod.GLOBAL_MEAN ) {
+                                       //compute global column mean (scale)
+                                       long off = _countList[j];
+                                       for( int i=0; i<in.getNumRows(); i++ )
+                                               _meanFn.execute2(_meanList[j], 
UtilFunctions.objectToDouble(
+                                                       
in.getSchema().get(colID-1), in.get(i, colID-1)), off+i+1);
+                                       _replacementList[j] = 
String.valueOf(_meanList[j]._sum);
+                                       _countList[j] += in.getNumRows();
+                               }
+                               else if( _mvMethodList[j] == 
MVMethod.GLOBAL_MODE ) {
+                                       //compute global column mode 
(categorical), i.e., most frequent category
+                                       HashMap<String,Long> hist = 
_hist.containsKey(colID) ? 
+                                                       _hist.get(colID) : new 
HashMap<String,Long>();
+                                       for( int i=0; i<in.getNumRows(); i++ ) {
+                                               String key = 
String.valueOf(in.get(i, colID-1));
+                                               if( key != null && 
!key.isEmpty() ) {
+                                                       Long val = 
hist.get(key);
+                                                       hist.put(key, 
(val!=null) ? val+1 : 1);
+                                               }       
+                                       }
+                                       _hist.put(colID, hist);
+                                       long max = Long.MIN_VALUE; 
+                                       for( Entry<String, Long> e : 
hist.entrySet() ) 
+                                               if( e.getValue() > max  ) {
+                                                       _replacementList[j] = 
e.getKey();
+                                                       max = e.getValue();
+                                               }
+                               }
+                       }
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
        }
 
        @Override
@@ -938,7 +988,7 @@ public class MVImputeAgent extends Encoder
                                        w = words[colID-1] = 
_replacementList[i];
                                
                                if ( _isMVScaled.get(i) )
-                                       if ( _mvscMethodList[i] == 1 )
+                                       if ( _mvscMethodList[i] == 
MVMethod.GLOBAL_MEAN )
                                                words[colID-1] = 
Double.toString( UtilFunctions.parseToDouble(w) - _meanList[i]._sum );
                                        else
                                                words[colID-1] = 
Double.toString( (UtilFunctions.parseToDouble(w) - _meanList[i]._sum) / 
_varList[i].mean._sum );
@@ -948,7 +998,7 @@ public class MVImputeAgent extends Encoder
                for(int i=0; i < _scnomvList.length; i++)
                {
                        int colID = _scnomvList[i];
-                       if ( _scnomvMethodList[i] == 1 )
+                       if ( _scnomvMethodList[i] == MVMethod.GLOBAL_MEAN )
                                words[colID-1] = Double.toString( 
UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum );
                        else
                                words[colID-1] = Double.toString( 
(UtilFunctions.parseToDouble(words[colID-1]) - _scnomvMeanList[i]._sum) / 
_scnomvVarList[i].mean._sum );
@@ -971,8 +1021,11 @@ public class MVImputeAgent extends Encoder
        
        @Override
        public FrameBlock getMetaData(FrameBlock out) {
-               // TODO Auto-generated method stub
-               return null;
+               for( int j=0; j<_colList.length; j++ ) {
+                       out.getColumnMetadata(_colList[j]-1)
+                          .setMvValue(_replacementList[j]);
+               }
+               return out;
        }
        
        /**
@@ -983,14 +1036,13 @@ public class MVImputeAgent extends Encoder
        public void initMetaData(FrameBlock meta) {
                //init replacement lists, replace recoded values to
                //apply mv imputation potentially after recoding
-               _replacementList = new String[_colList.length];
                for( int j=0; j<_colList.length; j++ ) {
-                       int colID = _colList[j];
+                       int colID = _colList[j];        
                        String mvVal = 
UtilFunctions.unquote(meta.getColumnMetadata(colID-1).getMvValue()); 
                        if( _rcList.contains(colID) ) {
                                Long mvVal2 = 
meta.getRecodeMap(colID-1).get(mvVal);
-                               if( mvVal2 == null) 
-                                       throw new RuntimeException("Missing 
recode value for impute value '"+mvVal+"'.");
+                               if( mvVal2 == null)
+                                       throw new RuntimeException("Missing 
recode value for impute value '"+mvVal+"' (colID="+colID+").");
                                _replacementList[j] = mvVal2.toString();
                        }
                        else {
@@ -1006,4 +1058,14 @@ public class MVImputeAgent extends Encoder
        public void initRecodeIDList(List<Integer> rcList) {
                _rcList = rcList;
        }
+       
+       /**
+        * Exposes the internal histogram after build.
+        * 
+        * @param colID
+        * @return
+        */
+       public HashMap<String,Long> getHistogram( int colID ) {
+               return _hist.get(colID);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 01d7c85..5abe9db 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -412,7 +412,7 @@ public class RecodeAgent extends Encoder
                                //probe and build column map
                                HashMap<String,Long> map = _rcdMaps.get(colID);
                                String key = row[colID-1];
-                               if( !map.containsKey(key) )
+                               if( key!=null && !key.isEmpty() && 
!map.containsKey(key) )
                                        map.put(key, new Long(map.size()+1));
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
index bafa655..d6bf9d4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -78,10 +78,10 @@ public class EncoderComposite extends Encoder
                
                //propagate meta data 
                _meta = new FrameBlock(in.getNumColumns(), ValueType.STRING);
-               for( Encoder encoder : _encoders ) {
-                       encoder.initMetaData(_meta);
+               for( Encoder encoder : _encoders )
                        _meta = encoder.getMetaData(_meta);
-               }
+               for( Encoder encoder : _encoders )
+                       encoder.initMetaData(_meta);
                
                //apply meta data
                for( Encoder encoder : _encoders )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 5e6e5b7..6bce4ff 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -369,7 +369,7 @@ public class UtilFunctions
        public static double objectToDouble(ValueType vt, Object in) {
                if( in == null )  return 0;
                switch( vt ) {
-                       case STRING:  return Double.parseDouble((String)in);
+                       case STRING:  return !((String)in).isEmpty() ? 
Double.parseDouble((String)in) : 0;
                        case BOOLEAN: return ((Boolean)in)?1d:0d;
                        case INT:     return (Long)in;
                        case DOUBLE:  return (Double)in;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
new file mode 100644
index 0000000..b61060b
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeApplyTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.transform;
+
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.io.MatrixReaderFactory;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class TransformFrameEncodeApplyTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME1 = "TransformFrameEncodeApply";
+       private final static String TEST_DIR = "functions/transform/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
TransformFrameEncodeApplyTest.class.getSimpleName() + "/";
+       
+       //dataset and transform tasks without missing values
+       private final static String DATASET1    = "homes3/homes.csv";
+       private final static String SPEC1               = 
"homes3/homes.tfspec_recode.json"; 
+       private final static String SPEC2               = 
"homes3/homes.tfspec_dummy.json";
+       private final static String SPEC3               = 
"homes3/homes.tfspec_bin.json"; //incl recode
+       
+       //dataset and transform tasks with missing values
+       private final static String DATASET2    = "homes/homes.csv";
+       private final static String SPEC4               = 
"homes3/homes.tfspec_impute.json";
+       private final static String SPEC5               = 
"homes3/homes.tfspec_omit.json";
+       
+       public enum TransformType {
+               RECODE,
+               DUMMY,
+               BIN,
+               IMPUTE,
+               OMIT,
+       }
+       
+       @Override
+       public void setUp()  {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "y" }) );
+       }
+       
+       @Test
+       public void testHomesRecodeSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.RECODE);
+       }
+       
+       @Test
+       public void testHomesRecodeSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.RECODE);
+       }
+       
+       @Test
+       public void testHomesDummycodeSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.DUMMY);
+       }
+       
+       @Test
+       public void testHomesDummycodeSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.DUMMY);
+       }
+       
+       @Test
+       public void testHomesBinningSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.BIN);
+       }
+       
+       @Test
+       public void testHomesBinningSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.BIN);
+       }
+       
+       @Test
+       public void testHomesOmitSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.OMIT);
+       }
+       
+       @Test
+       public void testHomesOmitSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.OMIT);
+       }
+       
+       @Test
+       public void testHomesImputeSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.IMPUTE);
+       }
+       
+       @Test
+       public void testHomesImputeSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.IMPUTE);
+       }
+
+       /**
+        * 
+        * @param rt
+        * @param ofmt
+        * @param dataset
+        */
+       private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt, 
TransformType type )
+       {
+               //set runtime platform
+               RUNTIME_PLATFORM rtold = rtplatform;
+               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
+               rtplatform = rt;
+
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == 
RUNTIME_PLATFORM.HYBRID_SPARK)
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+               //set transform specification
+               String SPEC = null; String DATASET = null;
+               switch( type ) {
+                       case RECODE: SPEC = SPEC1; DATASET = DATASET1; break;
+                       case DUMMY:  SPEC = SPEC2; DATASET = DATASET1; break;
+                       case BIN:    SPEC = SPEC3; DATASET = DATASET1; break;
+                       case IMPUTE: SPEC = SPEC4; DATASET = DATASET2; break;
+                       case OMIT:   SPEC = SPEC5; DATASET = DATASET2; break;
+               }
+
+               if( !ofmt.equals("csv") )
+                       throw new RuntimeException("Unsupported test output 
format");
+               
+               try
+               {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[]{"-explain","-nvargs", 
+                               "DATA=" + HOME + "input/" + DATASET,
+                               "TFSPEC=" + HOME + "input/" + SPEC,
+                               "TFDATA1=" + output("tfout1"),
+                               "TFDATA2=" + output("tfout2"),
+                               "OFMT=" + ofmt };
+       
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
+                       runTest(true, false, null, -1); 
+                       
+                       //read input/output and compare
+                       double[][] R1 = 
DataConverter.convertToDoubleMatrix(MatrixReaderFactory
+                               .createMatrixReader(InputInfo.CSVInputInfo)
+                               .readMatrixFromHDFS(output("tfout1"), -1L, -1L, 
1000, 1000, -1));
+                       double[][] R2 = 
DataConverter.convertToDoubleMatrix(MatrixReaderFactory
+                               .createMatrixReader(InputInfo.CSVInputInfo)
+                               .readMatrixFromHDFS(output("tfout2"), -1L, -1L, 
1000, 1000, -1));
+                       TestUtils.compareMatrices(R1, R2, R1.length, 
R1[0].length, 0);                  
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = rtold;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml 
b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
new file mode 100644
index 0000000..08c98d0
--- /dev/null
+++ b/src/test/scripts/functions/transform/TransformFrameEncodeApply.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+F1 = read($DATA, data_type="frame", format="csv");
+
+jspec = read($TFSPEC, data_type="scalar", value_type="string");
+
+[X, M] = transformencode(target=F1, spec=jspec);
+
+if(1==1){}
+
+X2 = transformapply(target=F1, spec=jspec, meta=M);
+
+write(X, $TFDATA1, format=$OFMT);
+write(X2, $TFDATA2, format=$OFMT);
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172bfcac/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
index 5122a60..bdcc36b 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/transform/ZPackageSuite.java
@@ -31,6 +31,7 @@ import org.junit.runners.Suite;
        TransformAndApplyTest.class,
        TransformEncodeDecodeTest.class,
        TransformFrameApplyTest.class,
+       TransformFrameEncodeApplyTest.class,
        TransformFrameEncodeDecodeTest.class,
        TransformReadMetaTest.class,
        TransformTest.class,


Reply via email to