[SYSTEMML-613] Extended transform encoder api and cleanup (dep spark)

This patch generalized the transform encoder api in order to support
transformencode over frames as well. Furthermore it cleans up the
existing agents (recode, dummy code etc) in order to isolate scala
dependencies to the spark backend.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/17b52217
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/17b52217
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/17b52217

Branch: refs/heads/master
Commit: 17b52217cd05c56efdd96580c44700ee28a5fa9c
Parents: 8e8283b
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 7 13:29:13 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Apr 7 13:30:30 2016 -0700

----------------------------------------------------------------------
 .../sysml/runtime/transform/BinAgent.java       |  44 ++++++--
 .../sysml/runtime/transform/DummycodeAgent.java |  32 ++++++
 .../sysml/runtime/transform/GenTfMtdSPARK.java  |  20 +++-
 .../sysml/runtime/transform/MVImputeAgent.java  |  37 +++++-
 .../sysml/runtime/transform/OmitAgent.java      |  27 +++++
 .../sysml/runtime/transform/RecodeAgent.java    | 102 ++++++++++++++++-
 .../sysml/runtime/transform/encode/Encoder.java |  43 +++++++
 .../transform/encode/EncoderComposite.java      | 113 +++++++++++++++++++
 .../transform/encode/EncoderFactory.java        |  77 +++++++++++++
 9 files changed, 474 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
index 4f24afd..35bfd5a 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java
@@ -37,9 +37,9 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -152,7 +152,7 @@ public class BinAgent extends Encoder
                }
        }
        
-       public ArrayList<Tuple2<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
+       public ArrayList<Pair<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
                if ( !isApplicable() )
                        return list;
                
@@ -161,9 +161,9 @@ public class BinAgent extends Encoder
                                int colID = _colList[i];
                                Integer iw = -colID;
                                
-                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMinOutput(i)) );
-                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepMaxOutput(i)) );
-                               list.add( new Tuple2<Integer,DistinctValue>(iw, 
prepNBinsOutput(i)) );
+                               list.add( new Pair<Integer,DistinctValue>(iw, 
prepMinOutput(i)) );
+                               list.add( new Pair<Integer,DistinctValue>(iw, 
prepMaxOutput(i)) );
+                               list.add( new Pair<Integer,DistinctValue>(iw, 
prepNBinsOutput(i)) );
                        }
                } catch(Exception e) {
                        throw new IOException(e);
@@ -317,4 +317,34 @@ public class BinAgent extends Encoder
                
                return words;
        }
+
+       @Override
+       public double[] encode(String[] in, double[] out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public void build(String[] in) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
index 9ceb368..cc51a51 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java
@@ -41,6 +41,8 @@ import org.apache.wink.json4j.JSONObject;
 import com.google.common.base.Functions;
 import com.google.common.collect.Ordering;
 
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -393,4 +395,34 @@ public class DummycodeAgent extends Encoder
                
                return nwords;
        }
+
+       @Override
+       public double[] encode(String[] in, double[] out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public void build(String[] in) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               // TODO Auto-generated method stub
+               
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java 
b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
index e6bb08c..a56f6ef 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,6 +43,7 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.Pair;
 
 public class GenTfMtdSPARK 
 {
@@ -129,17 +131,17 @@ public class GenTfMtdSPARK
                        
                        // Prepare the output in the form of DistinctValues, 
which subsequently need to be grouped and aggregated. 
                        
-                       ArrayList<Tuple2<Integer,DistinctValue>> outList = new 
ArrayList<Tuple2<Integer,DistinctValue>>();
+                       ArrayList<Pair<Integer,DistinctValue>> outList = new 
ArrayList<Pair<Integer,DistinctValue>>();
                        
                        
_agents.getMVImputeAgent().mapOutputTransformationMetadata(partitionID, 
outList, _agents);
                        
_agents.getRecodeAgent().mapOutputTransformationMetadata(partitionID, outList, 
_agents);
                        
_agents.getBinAgent().mapOutputTransformationMetadata(partitionID, outList, 
_agents);
                        
                        DistinctValue dv = new DistinctValue(new 
OffsetCount("Partition"+partitionID, _offsetInPartFile, _agents.getTotal()));
-                       Tuple2<Integer, DistinctValue> tuple = new 
Tuple2<Integer, DistinctValue>((int) (_agents.getNumCols()+1), dv); 
+                       Pair<Integer, DistinctValue> tuple = new Pair<Integer, 
DistinctValue>((int) (_agents.getNumCols()+1), dv); 
                        outList.add(tuple);
 
-                       return outList.iterator();
+                       return toTuple2List(outList).iterator();
                }
                
        }
@@ -211,4 +213,16 @@ public class GenTfMtdSPARK
                        return numRows;
                }
        }
+       
+       /**
+        * 
+        * @param in
+        * @return
+        */
+       public static List<Tuple2<Integer,DistinctValue>> 
toTuple2List(List<Pair<Integer,DistinctValue>> in) {
+               ArrayList<Tuple2<Integer,DistinctValue>> ret = new 
ArrayList<Tuple2<Integer,DistinctValue>>();
+               for( Pair<Integer,DistinctValue> e : in )
+                       ret.add(new Tuple2<Integer,DistinctValue>(e.getKey(), 
e.getValue()));
+               return ret;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
index 5cadef6..76419e6 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java
@@ -38,15 +38,15 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
-
-import scala.Tuple2;
-
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.CM;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.functionobjects.Mean;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.operators.CMOperator;
 import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
 import org.apache.sysml.runtime.transform.encode.Encoder;
@@ -468,12 +468,12 @@ public class MVImputeAgent extends Encoder
         * @throws IOException
         */
        
-       private void addDV(Integer iw, DistinctValue dv, 
ArrayList<Tuple2<Integer, DistinctValue>> list) throws IOException {
+       private void addDV(Integer iw, DistinctValue dv, 
ArrayList<Pair<Integer, DistinctValue>> list) throws IOException {
                if ( dv != null )       
-                       list.add( new Tuple2<Integer, DistinctValue>(iw, dv) ); 
+                       list.add( new Pair<Integer, DistinctValue>(iw, dv) );   
        }
 
-       public ArrayList<Tuple2<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
+       public ArrayList<Pair<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
                try { 
                        StringBuilder sb = new StringBuilder();
                        DistinctValue dv = null;
@@ -939,4 +939,29 @@ public class MVImputeAgent extends Encoder
                int idx = isApplicable(colID);          
                return (idx == -1) ? null : _replacementList[idx];
        }
+       @Override
+       public double[] encode(String[] in, double[] out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+       @Override
+       public void build(String[] in) {
+               // TODO Auto-generated method stub
+               
+       }
+       @Override
+       public void build(FrameBlock in) {
+               // TODO Auto-generated method stub
+               
+       }
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               // TODO Auto-generated method stub
+               return null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
index c169129..a4eb715 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/OmitAgent.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.transform.encode.Encoder;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -89,5 +91,30 @@ public class OmitAgent extends Encoder
        public String[] apply(String[] words) {
                return null;
        }
+
+       @Override
+       public double[] encode(String[] in, double[] out) {
+               return null;
+       }
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               return null;
+       }
+
+       @Override
+       public void build(String[] in) {
+               
+       }
+
+       @Override
+       public void build(FrameBlock in) {      
+       
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               return null;
+       }
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 1f9749d..49961a2 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,12 +39,11 @@ import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
-import scala.Tuple2;
-
 import com.google.common.collect.Ordering;
 
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.transform.decode.DecoderRecode;
@@ -182,12 +182,12 @@ public class RecodeAgent extends Encoder
                mapOutputHelper(taskID, out, null, agents);
        }
        
-       public ArrayList<Tuple2<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Tuple2<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
+       public ArrayList<Pair<Integer, DistinctValue>> 
mapOutputTransformationMetadata(int taskID, ArrayList<Pair<Integer, 
DistinctValue>> list, TfUtils agents) throws IOException {
                mapOutputHelper(taskID, null, list, agents);
                return list;
        }
        
-       public void mapOutputHelper(int taskID, OutputCollector<IntWritable, 
DistinctValue> out, ArrayList<Tuple2<Integer, DistinctValue>> list, TfUtils 
agents) throws IOException {
+       public void mapOutputHelper(int taskID, OutputCollector<IntWritable, 
DistinctValue> out, ArrayList<Pair<Integer, DistinctValue>> list, TfUtils 
agents) throws IOException {
                if ( _colList == null  && _mvrcdList == null )
                        return;
                
@@ -209,7 +209,7 @@ public class RecodeAgent extends Encoder
                                        }
                                        else if ( list != null ) {
                                                for(String s : map.keySet()) 
-                                                       list.add(new 
Tuple2<Integer,DistinctValue>(colID, new DistinctValue(s, map.get(s))) );
+                                                       list.add(new 
Pair<Integer,DistinctValue>(colID, new DistinctValue(s, map.get(s))) );
                                        }
                                }
                        }
@@ -441,5 +441,97 @@ public class RecodeAgent extends Encoder
                        return (tmp!=null) ? Long.toString(tmp) : null;
                }
        }
+
+       @Override
+       public double[] encode(String[] in, double[] out) {
+               if( !isApplicable() )
+                       return out;
+               
+               //build and apply recode maps
+               build(in);
+               apply(in);
+               
+               //convert to double 
+               for( int j=0; j<_colList.length; j++ )
+                       out[_colList[j]-1] = 
Double.parseDouble(in[_colList[j]-1]);             
+               return out;
+       }
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               if( !isApplicable() )
+                       return out;
+               
+               //build recode maps 
+               build(in);
+               
+               //apply created recode maps
+               Iterator<String[]> iter = in.getStringRowIterator();
+               for( int i=0; iter.hasNext(); i++ ) {
+                       String[] row = apply( iter.next() );
+                       for( int j=0; j<_colList.length; j++ ) {
+                               double val = 
Double.parseDouble(row[_colList[j]-1]);
+                               out.quickSetValue(i, _colList[j]-1, val);
+                       }       
+               }
+                       
+               return out;
+       }
+
+       @Override
+       public void build(String[] in) {
+               if( !isApplicable() )
+                       return;
+               
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       //allocate column map if necessary
+                       if( !_rcdMaps.containsKey(colID) ) 
+                               _rcdMaps.put(colID, new HashMap<String,Long>());
+                       //probe and build column map
+                       HashMap<String,Long> map = _rcdMaps.get(colID);
+                       String key = in[colID-1];
+                       if( !map.containsKey(key) )
+                               map.put(key, new Long(map.size()+1));
+               }
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               if( !isApplicable() )
+                       return;         
+               
+               Iterator<String[]> iter = in.getStringRowIterator();
+               while( iter.hasNext() )
+                       build( iter.next() );
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               if( !isApplicable() )
+                       return out;
+               
+               //inverse operation to initRecodeMaps
+               
+               //allocate output rows
+               int maxDistinct = 0;
+               for( int j=0; j<_colList.length; j++ )
+                       if( _rcdMaps.containsKey(_colList[j]) )
+                               maxDistinct = Math.max(maxDistinct, 
_rcdMaps.get(_colList[j]).size());
+               out.ensureAllocatedColumns(maxDistinct);
+               
+               //create compact meta data representation
+               for( int j=0; j<_colList.length; j++ ) {
+                       int colID = _colList[j]; //1-based
+                       int rowID = 0;
+                       if( _rcdMaps.containsKey(_colList[j]) )
+                               for( Entry<String, Long> e : 
_rcdMaps.get(colID).entrySet() ) {
+                                       String tmp = e.getKey() + 
Lop.DATATYPE_PREFIX + e.getValue().toString();
+                                       out.set(rowID++, colID-1, tmp); 
+                               }
+               }
+               
+               return out;
+       }
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
index 32694c9..481832b 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/Encoder.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.transform.DistinctValue;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -87,6 +89,47 @@ public abstract class Encoder implements Serializable
        }
        
        /**
+        * Row encode: build and apply (transform encode).
+        * 
+        * @param in
+        * @param out
+        * @return
+        */
+       public abstract double[] encode(String[] in, double[] out);
+       
+       /**
+        * Block encode: build and apply (transform encode).
+        * 
+        * @param in
+        * @param out
+        * @return
+        */
+       public abstract MatrixBlock encode(FrameBlock in, MatrixBlock out);
+
+       /**
+        * Build the transform meta data for given row input. This call modifies
+        * and keeps meta data as encoder state.
+        * 
+        * @param in
+        */
+       public abstract void build(String[] in);
+       
+       /**
+        * Build the transform meta data for the given block input. This call 
modifies
+        * and keeps meta data as encoder state.
+        * 
+        * @param in
+        */
+       public abstract void build(FrameBlock in);
+       
+       /**
+        * Construct a frame block out of the transform meta data.
+        * 
+        * @return
+        */
+       public abstract FrameBlock getMetaData(FrameBlock out);
+       
+       /**
         * Encode input data according to existing transform meta
         * data (transform apply).
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
new file mode 100644
index 0000000..3dc11df
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderComposite.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.transform.DistinctValue;
+import org.apache.sysml.runtime.transform.TfUtils;
+
+/**
+ * Simple composite encoder that applies a list of encoders 
+ * in specified order. By implementing the default encoder API
+ * it can be used as a drop-in replacement for any other encoder. 
+ * 
+ */
+public class EncoderComposite extends Encoder
+{
+       private static final long serialVersionUID = -8473768154646831882L;
+       
+       private List<Encoder> _encoders = null;
+       
+       protected EncoderComposite(List<Encoder> encoders) {
+               super(null);
+               _encoders = encoders;
+       }
+       
+       protected EncoderComposite(Encoder[] encoders) {
+               super(null);
+               _encoders = Arrays.asList(encoders);
+       }
+
+       @Override
+       public double[] encode(String[] in, double[] out) {
+               for( Encoder encoder : _encoders )
+                       encoder.encode(in, out);
+               return out;
+       }
+
+       @Override
+       public MatrixBlock encode(FrameBlock in, MatrixBlock out) {
+               for( Encoder encoder : _encoders )
+                       encoder.encode(in, out);
+               return out;
+       }
+
+       @Override
+       public void build(String[] in) {
+               for( Encoder encoder : _encoders )
+                       encoder.build(in);              
+       }
+
+       @Override
+       public void build(FrameBlock in) {
+               for( Encoder encoder : _encoders )
+                       encoder.build(in);
+       }
+
+       @Override
+       public FrameBlock getMetaData(FrameBlock out) {
+               for( Encoder encoder : _encoders )
+                       encoder.getMetaData(out);
+               return out;
+       }
+       
+       @Override
+       public String[] apply(String[] in) {
+               for( Encoder encoder : _encoders )
+                       encoder.apply(in);
+               return in;
+       }
+
+       @Override
+       public void 
mapOutputTransformationMetadata(OutputCollector<IntWritable, DistinctValue> 
out, int taskID, TfUtils agents) throws IOException {
+               throw new RuntimeException("File-based api not supported.");
+       }
+
+       @Override
+       public void 
mergeAndOutputTransformationMetadata(Iterator<DistinctValue> values, String 
outputDir, int colID, FileSystem fs, TfUtils agents) throws IOException {
+               throw new RuntimeException("File-based api not supported.");    
+       }
+
+       @Override
+       public void loadTxMtd(JobConf job, FileSystem fs, Path txMtdDir, 
TfUtils agents) throws IOException {
+               throw new RuntimeException("File-based api not supported.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/17b52217/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
new file mode 100644
index 0000000..cb5b02e
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.transform.encode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.transform.RecodeAgent;
+import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.wink.json4j.JSONObject;
+
+public class EncoderFactory 
+{
+       /**
+        * 
+        * @param spec
+        * @param clen
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       public static Encoder createEncoder(String spec, int clen) throws 
DMLRuntimeException {
+               return createEncoder(spec, Collections.nCopies(clen, 
ValueType.STRING));
+       }
+       
+       
+       /**
+        * 
+        * @param spec
+        * @param schema
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static Encoder createEncoder(String spec, List<ValueType> 
schema) 
+               throws DMLRuntimeException 
+       {       
+               Encoder encoder = null;
+               
+               try {
+                       //parse transform specification
+                       JSONObject jSpec = new JSONObject(spec);
+                       List<Encoder> lencoders = new ArrayList<Encoder>();
+               
+                       //create encoder 'recode'
+                       if ( jSpec.containsKey(TfUtils.TXMETHOD_RECODE))  {
+                               lencoders.add(new RecodeAgent(jSpec));
+                       }
+                       
+                       //create composite decoder of all created decoders
+                       encoder = new EncoderComposite(lencoders);
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+               
+               return encoder;
+       }
+}

Reply via email to