[SYSTEMML-555] New transformapply builtin function over cp frames

This patch introduces a new transformapply builtin function over
in-memory frames consuming both input and metadata as frames. In order
to allow for a smooth transition from the existing transform, it exists
redundantly to the transform apply functionality. Once we have full
bufferpool integration and distributed operations for transformapply, we
will remove the old apply functionality from transform. Note that right
now transformapply is limited to recoding. Finally, this change also
includes a fix for frames (maintaining the number of rows) as well as a
transformapply jmlc test.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e69a1c26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e69a1c26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e69a1c26

Branch: refs/heads/master
Commit: e69a1c2613892fea614435c26e75b38d8b0d67f4
Parents: 94dbca0
Author: Matthias Boehm <[email protected]>
Authored: Sat Mar 12 15:37:00 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Mar 12 15:37:00 2016 -0800

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java    |   4 +-
 .../sysml/hops/ParameterizedBuiltinOp.java      |  22 +++-
 .../apache/sysml/lops/ParameterizedBuiltin.java |  65 +++++-----
 .../org/apache/sysml/parser/DMLTranslator.java  |   8 ++
 .../org/apache/sysml/parser/Expression.java     |   2 +-
 .../ParameterizedBuiltinFunctionExpression.java |  41 ++++++-
 .../instructions/CPInstructionParser.java       |   1 +
 .../cp/ParameterizedBuiltinCPInstruction.java   |  29 +++--
 .../sysml/runtime/matrix/data/FrameBlock.java   |  17 ++-
 .../sysml/runtime/transform/DataTransform.java  |  44 +++++++
 .../sysml/runtime/transform/RecodeAgent.java    |  31 ++++-
 .../apache/sysml/runtime/transform/TfUtils.java | 119 ++++++++++---------
 .../functions/jmlc/FrameTransformTest.java      |  56 +++++++--
 src/test/scripts/functions/jmlc/transform.dml   |   5 +-
 14 files changed, 322 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index a468ed7..1385995 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -1089,7 +1089,8 @@ public abstract class Hop
        };
 
        public enum ParamBuiltinOp {
-               INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, 
TRANSFORM
+               INVALID, CDF, INVCDF, GROUPEDAGG, RMEMPTY, REPLACE, REXPAND, 
+               TRANSFORM, TRANSFORMAPPLY
        };
 
        /**
@@ -1330,6 +1331,7 @@ public abstract class Hop
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REPLACE, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REPLACE);
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.REXPAND, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.REXPAND);
                HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORM, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORM);
+               HopsParameterizedBuiltinLops.put(ParamBuiltinOp.TRANSFORMAPPLY, 
org.apache.sysml.lops.ParameterizedBuiltin.OperationTypes.TRANSFORMAPPLY);      
        
        }
 
        protected static final HashMap<Hop.OpOp2, String> HopsOpOp2String;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 3a8445f..cf5206d 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -217,6 +217,15 @@ public class ParameterizedBuiltinOp extends Hop implements 
MultiThreadedHop
                        pbilop.getOutputParameters().setFormat(Format.CSV);
                        setLops(pbilop);
                }
+               else if ( _op == ParamBuiltinOp.TRANSFORMAPPLY ) 
+               {
+                       ExecType et = optFindExecType();                        
+                       ParameterizedBuiltin pbilop = new 
ParameterizedBuiltin(inputlops,
+                                       HopsParameterizedBuiltinLops.get(_op), 
getDataType(), getValueType(), et);
+                       setOutputDimensions(pbilop);
+                       setLineNumbers(pbilop);
+                       setLops(pbilop);
+               }
 
                //add reblock/checkpoint lops if necessary
                constructAndSetLopsDataFlowProperties();
@@ -979,12 +988,13 @@ public class ParameterizedBuiltinOp extends Hop 
implements MultiThreadedHop
                }
                else 
                {
-                       if( _op == ParamBuiltinOp.TRANSFORM )
-                       {
-                               // force remote execution type here.
-                               // At runtime, cp-side transform is triggered 
for small files.
+                       if( _op == ParamBuiltinOp.TRANSFORM ) {
+                               // force remote, at runtime cp transform 
triggered for small files.
                                return REMOTE;
                        }
+                       else if( _op == ParamBuiltinOp.TRANSFORMAPPLY ) {
+                               return ExecType.CP;
+                       }
                        
                        if ( OptimizerUtils.isMemoryBasedOptLevel() ) {
                                _etype = findExecTypeByMemEstimate();
@@ -1081,6 +1091,10 @@ public class ParameterizedBuiltinOp extends Hop 
implements MultiThreadedHop
                                
                                break;  
                        }
+                       case TRANSFORMAPPLY: {
+                               Hop target = 
getInput().get(_paramIndexMap.get("target"));
+                               setDim1( target.getDim1() ); //rows remain 
unchanged
+                       }
                        default:
                                //do nothing
                                break;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java 
b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
index 17ec274..4a8ae72 100644
--- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
+++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.lops;
 
 import java.util.HashMap;
+import java.util.Map.Entry;
 
 import org.apache.sysml.hops.HopsException;
 import org.apache.sysml.lops.LopProperties.ExecLocation;
@@ -40,7 +41,7 @@ public class ParameterizedBuiltin extends Lop
        public enum OperationTypes { 
                INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, 
                PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, PEXP, QEXP,
-               TRANSFORM
+               TRANSFORM, TRANSFORMAPPLY,
        };
        
        private OperationTypes _operation;
@@ -211,24 +212,12 @@ public class ParameterizedBuiltin extends Lop
                                
                                break;
                        
-                       case REPLACE:
+                       case REPLACE: {
                                sb.append( "replace" );
                                sb.append( OPERAND_DELIMITOR );
-                               
-                               for ( String s : _inputParams.keySet() ) 
-                               {       
-                                       sb.append( s );
-                                       sb.append( NAME_VALUE_SEPARATOR );
-                                       
-                                       // get the value/label of the scalar 
input associated with name "s"
-                                       Lop iLop = _inputParams.get(s);
-                                       if( s.equals("target") )
-                                               
sb.append(iLop.getOutputParameters().getLabel());
-                                       else
-                                               sb.append( 
iLop.prepScalarLabel() );
-                                       sb.append( OPERAND_DELIMITOR );
-                               }
+                               sb.append(compileGenericParamMap(_inputParams));
                                break;
+                       }
                        
                        case REXPAND:
                                sb.append("rexpand");
@@ -252,23 +241,16 @@ public class ParameterizedBuiltin extends Lop
                                
                                break;
                                
-                       case TRANSFORM:
-                       {
+                       case TRANSFORM: {
                                sb.append("transform");
                                sb.append(OPERAND_DELIMITOR);
-                               
-                               for ( String s : _inputParams.keySet() ) {
-                                       sb.append(s);
-                                       sb.append(NAME_VALUE_SEPARATOR);
-                                       
-                                       Lop iLop = _inputParams.get(s);
-                                       if( iLop.getDataType() != 
DataType.SCALAR )
-                                               sb.append( 
iLop.getOutputParameters().getLabel());
-                                       else
-                                               sb.append( 
iLop.prepScalarLabel() );
-                                       
-                                       sb.append(OPERAND_DELIMITOR);
-                               }
+                               sb.append(compileGenericParamMap(_inputParams));
+                               break;
+                       }                       
+                       case TRANSFORMAPPLY: {
+                               sb.append("transformapply");
+                               sb.append(OPERAND_DELIMITOR);
+                               sb.append(compileGenericParamMap(_inputParams));
                                break;
                        }
                                
@@ -512,5 +494,24 @@ public class ParameterizedBuiltin extends Lop
                sb.append(" ; blocked=" + 
this.getOutputParameters().isBlocked());
                return sb.toString();
        }
-
+       
+       /**
+        * 
+        * @param params
+        * @return
+        */
+       private static String compileGenericParamMap(HashMap<String, Lop> 
params) {
+               StringBuilder sb = new StringBuilder();         
+               for ( Entry<String, Lop> e : params.entrySet() ) {
+                       sb.append(e.getKey());
+                       sb.append(NAME_VALUE_SEPARATOR);
+                       if( e.getValue().getDataType() != DataType.SCALAR )
+                               sb.append( 
e.getValue().getOutputParameters().getLabel());
+                       else
+                               sb.append( e.getValue().prepScalarLabel() );
+                       sb.append(OPERAND_DELIMITOR);
+               }
+               
+               return sb.toString();
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index 543acf0..f295fe0 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -1983,6 +1983,14 @@ public class DMLTranslator
                                                                        
target.getValueType(), ParamBuiltinOp.TRANSFORM, 
                                                                        
paramHops);
                        break;  
+               
+               case TRANSFORMAPPLY:
+                       currBuiltinOp = new ParameterizedBuiltinOp(
+                                                                       
target.getName(), target.getDataType(), 
+                                                                       
target.getValueType(), ParamBuiltinOp.TRANSFORMAPPLY, 
+                                                                       
paramHops);
+                       break;  
+                       
                        
                default:
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/parser/Expression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/Expression.java 
b/src/main/java/org/apache/sysml/parser/Expression.java
index 99edb74..afce7bd 100644
--- a/src/main/java/org/apache/sysml/parser/Expression.java
+++ b/src/main/java/org/apache/sysml/parser/Expression.java
@@ -126,7 +126,7 @@ public abstract class Expression
                GROUPEDAGG, RMEMPTY, REPLACE, ORDER, 
                // Distribution Functions
                CDF, INVCDF, PNORM, QNORM, PT, QT, PF, QF, PCHISQ, QCHISQ, 
PEXP, QEXP,
-               TRANSFORM, 
+               TRANSFORM, TRANSFORMAPPLY, 
                INVALID
        };
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
 
b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
index 8d1f454..edda675 100644
--- 
a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
+++ 
b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
@@ -33,8 +33,9 @@ public class ParameterizedBuiltinFunctionExpression extends 
DataIdentifier
        private HashMap<String,Expression> _varParams;
        
        public static final String TF_FN_PARAM_DATA = "target";
-       public static final String TF_FN_PARAM_MTD = "transformPath";
+       public static final String TF_FN_PARAM_MTD2 = "meta";
        public static final String TF_FN_PARAM_SPEC = "spec";
+       public static final String TF_FN_PARAM_MTD = "transformPath"; //NOTE 
MB: for backwards compatibility
        public static final String TF_FN_PARAM_APPLYMTD = "applyTransformPath";
        public static final String TF_FN_PARAM_OUTNAMES = "outputNames";
        
@@ -64,6 +65,7 @@ public class ParameterizedBuiltinFunctionExpression extends 
DataIdentifier
 
                // data transformation functions
                opcodeMap.put("transform",      
Expression.ParameterizedBuiltinFunctionOp.TRANSFORM);
+               opcodeMap.put("transformapply", 
Expression.ParameterizedBuiltinFunctionOp.TRANSFORMAPPLY);
        }
        
        public static HashMap<Expression.ParameterizedBuiltinFunctionOp, 
ParamBuiltinOp> pbHopMap;
@@ -230,7 +232,11 @@ public class ParameterizedBuiltinFunctionExpression 
extends DataIdentifier
                case TRANSFORM:
                        validateTransform(output, conditional);
                        break;
-                       
+               
+               case TRANSFORMAPPLY:
+                       validateTransformApply(output, conditional);
+                       break;
+               
                default: //always unconditional (because unsupported operation)
                        raiseValidateError("Unsupported parameterized function 
"+ this.getOpCode(), false, LanguageErrorCodes.INVALID_PARAMETERS);
                }
@@ -291,6 +297,37 @@ public class ParameterizedBuiltinFunctionExpression 
extends DataIdentifier
                output.setDimensions(-1, -1);
        }
        
+       // example: A = transformapply(target=X, meta=M, spec=s)
+       private void validateTransformApply(DataIdentifier output, boolean 
conditional) 
+               throws LanguageException 
+       {
+               //validate data
+               Expression data = getVarParam(TF_FN_PARAM_DATA);
+               if( data==null )                                
+                       raiseValidateError("Named parameter '" + 
TF_FN_PARAM_DATA + "' missing. Please specify the input data set.", 
conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+               else if( data.getOutput().getDataType() != DataType.FRAME )
+                       raiseValidateError("Input to tansformapply() must be of 
type 'frame'. It is of type '"+data.getOutput().getDataType()+"'.", 
conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+                       
+               //validate meta data (recode maps)
+               Expression mtd = getVarParam(TF_FN_PARAM_MTD2);
+               if( mtd==null )
+                       raiseValidateError("Named parameter '" + 
TF_FN_PARAM_MTD2 + "' missing. Please specify the transformation metadata.", 
conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+               else if( mtd.getOutput().getDataType() != DataType.FRAME )
+                       raiseValidateError("Metadata of tansformapply() must be 
of type 'frame'. It is of type '"+data.getOutput().getDataType()+"'.", 
conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+               
+               //validate specification
+               Expression spec = getVarParam(TF_FN_PARAM_SPEC);
+               if( spec==null )
+                       raiseValidateError("Named parameter '" + 
TF_FN_PARAM_SPEC + "' missing. Please specify the transformation specification 
(JSON string).", conditional, LanguageErrorCodes.INVALID_PARAMETERS);
+               else if( spec.getOutput().getDataType() != DataType.SCALAR  || 
spec.getOutput().getValueType() != ValueType.STRING )
+                       raiseValidateError("Transformation specification '" + 
TF_FN_PARAM_SPEC + "' must be a string value (a scalar).", conditional, 
LanguageErrorCodes.INVALID_PARAMETERS);
+               
+               //set output dimensions
+               output.setDataType(DataType.MATRIX);
+               output.setValueType(ValueType.DOUBLE);
+               output.setDimensions(-1, -1);
+       }
+       
        private void validateReplace(DataIdentifier output, boolean 
conditional) throws LanguageException {
                //check existence and correctness of arguments
                Expression target = getVarParam("target");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
index 4cb67e3..0a99f2a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/CPInstructionParser.java
@@ -181,6 +181,7 @@ public class CPInstructionParser extends InstructionParser
                String2CPInstructionType.put( "replace"     , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( "rexpand"     , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2CPInstructionType.put( "transform"       , 
CPINSTRUCTION_TYPE.ParameterizedBuiltin);
+               String2CPInstructionType.put( 
"transformapply",CPINSTRUCTION_TYPE.ParameterizedBuiltin);
 
                // Variable Instruction Opcodes 
                String2CPInstructionType.put( "assignvar"   , 
CPINSTRUCTION_TYPE.Variable);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index 2245d4c..314457c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -19,14 +19,13 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
-import java.io.IOException;
 import java.util.HashMap;
 
-import org.apache.wink.json4j.JSONException;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.parser.Statement;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin;
@@ -120,7 +119,9 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
                        return new ParameterizedBuiltinCPInstruction(new 
SimpleOperator(func), paramsMap, out, opcode, str);
                }
-               else if ( opcode.equals("transform")) {
+               else if (   opcode.equals("transform")
+                                || opcode.equals("transformapply")) 
+               {
                        return new ParameterizedBuiltinCPInstruction(null, 
paramsMap, out, opcode, str);
                }
                else {
@@ -226,19 +227,27 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                }
                else if ( opcode.equalsIgnoreCase("transform")) {
                        MatrixObject mo = (MatrixObject) 
ec.getVariable(params.get("target"));
-                       MatrixObject out = (MatrixObject) 
ec.getVariable(output.getName());
-                       
+                       MatrixObject out = (MatrixObject) 
ec.getVariable(output.getName());                     
                        try {
                                JobReturn jt = 
DataTransform.cpDataTransform(this, new MatrixObject[] { mo } , new 
MatrixObject[] {out} );
                                
out.updateMatrixCharacteristics(jt.getMatrixCharacteristics(0));
-                       } catch (IllegalArgumentException e) {
-                               throw new DMLRuntimeException(e);
-                       } catch (IOException e) {
-                               throw new DMLRuntimeException(e);
-                       } catch (JSONException e) {
+                       } catch (Exception e) {
                                throw new DMLRuntimeException(e);
                        }
                }
+               else if ( opcode.equalsIgnoreCase("transformapply")) {
+                       //sanity checks valid inputs
+                       if( !(ec.getVariable(params.get("target")) instanceof 
FrameObject) )
+                               throw new DMLRuntimeException("Transformapply 
requires FrameObject input for 'target'.");
+                       if( !(ec.getVariable(params.get("meta")) instanceof 
FrameObject) )
+                               throw new DMLRuntimeException("Transformapply 
requires FrameObject input for 'meta'.");
+                       
+                       FrameObject dataobj = (FrameObject) 
ec.getVariable(params.get("target"));
+                       FrameObject metaobj = (FrameObject) 
ec.getVariable(params.get("meta"));
+                       
+                       MatrixBlock mbout = 
DataTransform.cpDataTransform(getParameterMap(), dataobj.getData(), 
metaobj.getData() );
+                       ec.setMatrixOutput(output.getName(), mbout);
+               }
                else {
                        throw new DMLRuntimeException("Unknown opcode : " + 
opcode);
                }               

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 34078f7..cb44d8e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -27,8 +27,10 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -73,7 +75,7 @@ public class FrameBlock implements Writable, Externalizable
        }
        
        public FrameBlock(List<ValueType> schema, List<String> names, 
String[][] data) {
-               _numRows = data.length;
+               _numRows = 0; //maintained on append
                _schema = new ArrayList<ValueType>(schema);
                _colnames = new ArrayList<String>(names);
                _coldata = new ArrayList<Array>();
@@ -119,6 +121,19 @@ public class FrameBlock implements Writable, Externalizable
        }
        
        /**
+        * Creates a mapping from column names to column IDs, i.e., 
+        * 1-based column indexes
+        * 
+        * @return
+        */
+       public Map<String,Integer> getColumnNameIDMap() {
+               Map<String, Integer> ret = new HashMap<String, Integer>();
+               for( int j=0; j<getNumColumns(); j++ )
+                       ret.put(_colnames.get(j), j+1);
+               return ret;     
+       }
+       
+       /**
         * Allocate column data structures if necessary, i.e., if schema 
specified
         * but not all column data structures created yet.
         */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java 
b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
index fb6df5e..44e9141 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -72,6 +73,7 @@ import org.apache.sysml.runtime.matrix.JobReturn;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -1053,6 +1055,48 @@ public class DataTransform
        }
        
        /**
+        * Apply given transform metadata (incl recode maps) over an in-memory 
frame input in order to
+        * create a transformed numerical matrix. Note: The number of rows 
always remains unchanged, 
+        * whereas the number of column might increase or decrease. 
+        * 
+        * @param params
+        * @param input
+        * @param meta
+        * @param spec
+        * @return
+        * @throws DMLRuntimeException
+        * @throws  
+        */
+       public static MatrixBlock cpDataTransform(HashMap<String,String> 
params, FrameBlock input, FrameBlock meta) 
+               throws DMLRuntimeException
+       {
+               MatrixBlock ret = null;
+               
+               try
+               {
+                       //initialize transform encoders
+                       JSONObject spec = new JSONObject(params.get("spec"));
+                       TfUtils agents = new TfUtils(spec, 
input.getNumColumns());              
+                       agents.getRecodeAgent().initRecodeMaps(meta);
+                       
+                       //core transform apply over input frame and append to 
output
+                       //FIXME number of output columns after encoder creation
+                       ret = new MatrixBlock(input.getNumRows(), 
input.getNumColumns(), false);
+                       Iterator<String[]> iter = input.getStringRowIterator();
+                       for( int i=0; iter.hasNext(); i++ ) {
+                               String[] tmp = agents.apply(iter.next(), true);
+                               for( int j=0; j<tmp.length; j++ )
+                                       ret.appendValue(i, j, 
UtilFunctions.parseToDouble(tmp[j]));
+                       }
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
+               }
+               
+               return ret;
+       }
+       
+       /**
         * Helper function to fetch and sort the list of part files under the 
given
         * input directory.
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java 
b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 295c056..2ca3cfc 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -42,6 +42,9 @@ import org.apache.wink.json4j.JSONObject;
 import scala.Tuple2;
 
 import com.google.common.collect.Ordering;
+
+import org.apache.sysml.lops.Lop;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.transform.MVImputeAgent.MVMethod;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -62,8 +65,14 @@ public class RecodeAgent extends TransformationAgent {
                
                if ( parsedSpec.containsKey(TX_METHOD.RECODE.toString())) 
                {
-                       JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.RECODE.toString());
-                       JSONArray attrs = (JSONArray) obj.get(JSON_ATTRS);
+                       //TODO consolidate external and internal json spec 
definitions
+                       JSONArray attrs = null;
+                       if( parsedSpec.get(TX_METHOD.RECODE.toString()) 
instanceof JSONObject ) {
+                               JSONObject obj = (JSONObject) 
parsedSpec.get(TX_METHOD.RECODE.toString());
+                               attrs = (JSONArray) obj.get(JSON_ATTRS);
+                       }
+                       else
+                               attrs = 
(JSONArray)parsedSpec.get(TX_METHOD.RECODE.toString());
                        
                        _rcdList = new int[attrs.size()];
                        for(int i=0; i < _rcdList.length; i++) 
@@ -96,6 +105,24 @@ public class RecodeAgent extends TransformationAgent {
                }
        }
        
+       /**
+        * Construct the recodemaps from the given input frame for all 
+        * columns registered for recode.
+        * 
+        * @param frame
+        */
+       public void initRecodeMaps( FrameBlock frame ) {
+               for( int j=0; j<_rcdList.length; j++ ) {
+                       int colID = _rcdList[j]; //1-based
+                       HashMap<String,Long> map = new HashMap<String,Long>();
+                       for( int i=0; i<frame.getNumRows(); i++ ) {
+                               String[] tmp = frame.get(i, 
colID-1).toString().split(Lop.DATATYPE_PREFIX);
+                               map.put(tmp[0], Long.parseLong(tmp[1]));
+                       }
+                       _rcdMaps.put(colID, map);
+               }
+       }
+       
        void prepare(String[] words, TfUtils agents) {
                if ( _rcdList == null && _mvrcdList == null )
                        return;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 786b111..7ce3228 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -82,6 +82,69 @@ public class TfUtils implements Serializable{
        private String _tmpDir = null;
        private String _outputPath = null;
        
+
+       public TfUtils(JobConf job, boolean minimal) 
+               throws IOException, JSONException 
+       {
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       ConfigurationManager.setCachedJobConf(job);
+               }
+               
+               _NAstrings = TfUtils.parseNAStrings(job);
+               _specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+               
+               FileSystem fs = FileSystem.get(job);
+               JSONObject spec = TfUtils.readSpec(fs, _specFile);
+               
+               _oa = new OmitAgent(spec);
+       }
+       
+       // called from GenTFMtdMapper, ApplyTf (Hadoop)
+       public TfUtils(JobConf job) 
+               throws IOException, JSONException 
+       {
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
+                       ConfigurationManager.setCachedJobConf(job);
+               }
+               
+               boolean hasHeader = 
Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
+               //Pattern delim = 
Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
+               String[] naStrings = TfUtils.parseNAStrings(job);
+               
+               long numCols = UtilFunctions.parseToLong( 
job.get(MRJobConfiguration.TF_NUM_COLS) );            // #of columns in input 
data
+                       
+               String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
+               String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
+               String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
+               String outputPath = 
FileOutputFormat.getOutputPath(job).toString();
+               FileSystem fs = FileSystem.get(job);
+               JSONObject spec = TfUtils.readSpec(fs, specFile);
+               
+               init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, 
job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, 
tmpPath, outputPath);
+       }
+       
+       // called from GenTfMtdReducer 
+       public TfUtils(JobConf job, String tfMtdDir) throws IOException, 
JSONException 
+       {
+               this(job);
+               _tfMtdDir = tfMtdDir;
+       }
+       
+       // called from GenTFMtdReducer and ApplyTf (Spark)
+       public TfUtils(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String 
offsetFile, String tmpPath) throws IOException, JSONException {
+               init (headerLine, hasHeader, delim, naStrings, spec, ncol, 
offsetFile, tmpPath, null);
+               _tfMtdDir = tfMtdDir;
+       }
+       
+       //called from cp frame transformapply
+       public TfUtils(JSONObject spec, long inNcol) 
+               throws IOException, JSONException 
+       {
+               //TODO recodemaps handover
+               _numInputCols = inNcol;
+               createAgents(spec);
+       }
+       
        protected static boolean checkValidInputFile(FileSystem fs, Path path, 
boolean err)
                        throws IOException {
                // check non-existing file
@@ -193,59 +256,6 @@ public class TfUtils implements Serializable{
                createAgents(spec);
        }
        
-       public TfUtils(JobConf job, boolean minimal) 
-               throws IOException, JSONException 
-       {
-               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                       ConfigurationManager.setCachedJobConf(job);
-               }
-               
-               _NAstrings = TfUtils.parseNAStrings(job);
-               _specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-               
-               FileSystem fs = FileSystem.get(job);
-               JSONObject spec = TfUtils.readSpec(fs, _specFile);
-               
-               _oa = new OmitAgent(spec);
-       }
-       
-       // called from GenTFMtdMapper, ApplyTf (Hadoop)
-       public TfUtils(JobConf job) 
-               throws IOException, JSONException 
-       {
-               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-                       ConfigurationManager.setCachedJobConf(job);
-               }
-               
-               boolean hasHeader = 
Boolean.parseBoolean(job.get(MRJobConfiguration.TF_HAS_HEADER));
-               //Pattern delim = 
Pattern.compile(Pattern.quote(job.get(MRJobConfiguration.TF_DELIM)));
-               String[] naStrings = TfUtils.parseNAStrings(job);
-               
-               long numCols = UtilFunctions.parseToLong( 
job.get(MRJobConfiguration.TF_NUM_COLS) );            // #of columns in input 
data
-                       
-               String specFile = job.get(MRJobConfiguration.TF_SPEC_FILE);
-               String offsetFile = job.get(MRJobConfiguration.TF_OFFSETS_FILE);
-               String tmpPath = job.get(MRJobConfiguration.TF_TMP_LOC);
-               String outputPath = 
FileOutputFormat.getOutputPath(job).toString();
-               FileSystem fs = FileSystem.get(job);
-               JSONObject spec = TfUtils.readSpec(fs, specFile);
-               
-               init(job.get(MRJobConfiguration.TF_HEADER), hasHeader, 
job.get(MRJobConfiguration.TF_DELIM), naStrings, spec, numCols, offsetFile, 
tmpPath, outputPath);
-       }
-       
-       // called from GenTfMtdReducer 
-       public TfUtils(JobConf job, String tfMtdDir) throws IOException, 
JSONException 
-       {
-               this(job);
-               _tfMtdDir = tfMtdDir;
-       }
-       
-       // called from GenTFMtdReducer and ApplyTf (Spark)
-       public TfUtils(String headerLine, boolean hasHeader, String delim, 
String[] naStrings, JSONObject spec, long ncol, String tfMtdDir, String 
offsetFile, String tmpPath) throws IOException, JSONException {
-               init (headerLine, hasHeader, delim, naStrings, spec, ncol, 
offsetFile, tmpPath, null);
-               _tfMtdDir = tfMtdDir;
-       }
-       
        public void incrValid() { _numValidRecords++; }
        public long getValid()  { return _numValidRecords; }
        public long getTotal()  { return _numRecordsInPartFile; }
@@ -424,8 +434,7 @@ public class TfUtils implements Serializable{
                        words = getRecodeAgent().apply(words, this);
 
                words = getBinAgent().apply(words, this);
-               words = getDummycodeAgent().apply(words, this);
-               
+               words = getDummycodeAgent().apply(words, this);         
                _numTransformedRows++;
                
                return words;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
index 69950a3..7940095 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java
@@ -22,12 +22,15 @@ package org.apache.sysml.test.integration.functions.jmlc;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Map.Entry;
 
 import org.junit.Assert;
+import org.junit.Test;
 import org.apache.sysml.api.DMLException;
 import org.apache.sysml.api.jmlc.Connection;
 import org.apache.sysml.api.jmlc.PreparedScript;
 import org.apache.sysml.api.jmlc.ResultVariables;
+import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
@@ -46,7 +49,7 @@ public class FrameTransformTest extends AutomatedTestBase
        private final static int rows = 700;
        private final static int cols = 3;
        
-       private final static int nRuns = 10;
+       private final static int nRuns = 2;
        
        private final static double sparsity1 = 0.7;
        private final static double sparsity2 = 0.1;
@@ -57,7 +60,6 @@ public class FrameTransformTest extends AutomatedTestBase
                addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "Y" }) ); 
        }
        
-       /*
        @Test
        public void testJMLCTransformDense() throws IOException {
                runJMLCReuseTest(TEST_NAME1, false, false);
@@ -77,7 +79,6 @@ public class FrameTransformTest extends AutomatedTestBase
        public void testJMLCTransformSparseReuse() throws IOException {
                runJMLCReuseTest(TEST_NAME1, true, true);
        }
-       */
 
        /**
         * 
@@ -86,7 +87,6 @@ public class FrameTransformTest extends AutomatedTestBase
         * @param instType
         * @throws IOException 
         */
-       @SuppressWarnings("unused")
        private void runJMLCReuseTest( String testname, boolean sparse, boolean 
modelReuse ) 
                throws IOException
        {       
@@ -98,13 +98,14 @@ public class FrameTransformTest extends AutomatedTestBase
                //generate inputs
                double[][] Xd = TestUtils.round(getRandomMatrix(rows, cols, 
0.51, 7.49, sparse?sparsity2:sparsity1, 1234));
                String[][] Xs = createFrameData(Xd);
+               String[][] Ms = createRecodeMaps(Xs);
                
                //run DML via JMLC
-               ArrayList<double[][]> Yset = execDMLScriptviaJMLC( TEST_NAME, 
Xs, modelReuse );
+               ArrayList<double[][]> Yset = execDMLScriptviaJMLC( TEST_NAME, 
Xs, Ms, modelReuse );
                
-               //check non-empty y
+               //check correct result (nnz 7 + 0 -> 8 distinct vals)
                for( double[][] data : Yset )
-                       Assert.assertEquals("Wrong result: "+data[0][0]+".", 
new Double(7), new Double(data[0][0]));
+                       Assert.assertEquals("Wrong result: "+data[0][0]+".", 
new Double(8), new Double(data[0][0]));
        }
 
        /**
@@ -114,7 +115,7 @@ public class FrameTransformTest extends AutomatedTestBase
         * @throws DMLException
         * @throws IOException
         */
-       private ArrayList<double[][]> execDMLScriptviaJMLC( String testname, 
String[][] X, boolean modelReuse) 
+       private ArrayList<double[][]> execDMLScriptviaJMLC( String testname, 
String[][] X, String[][] M, boolean modelReuse) 
                throws IOException
        {
                Timing time = new Timing(true);
@@ -128,22 +129,22 @@ public class FrameTransformTest extends AutomatedTestBase
                {
                        //prepare input arguments
                        HashMap<String,String> args = new 
HashMap<String,String>();
-                       args.put("$TRANSFORM_PATH", SCRIPT_DIR + TEST_DIR + 
"/tfmtd");
                        args.put("$TRANSFORM_SPEC", "{ \"ids\": true 
,\"recode\": [ 1, 2, 3] }");
                        
                        //read and precompile script
                        String script = conn.readScript(SCRIPT_DIR + TEST_DIR + 
testname + ".dml");     
-                       PreparedScript pstmt = conn.prepareScript(script, args, 
new String[]{"X"}, new String[]{"Y"}, false);
+                       PreparedScript pstmt = conn.prepareScript(script, args, 
new String[]{"X","M"}, new String[]{"Y"}, false);
                        
                        if( modelReuse )
-                               pstmt.setFrame("X", X);
+                               pstmt.setFrame("M", M, true);
                        
                        //execute script multiple times
                        for( int i=0; i<nRuns; i++ )
                        {
                                //bind input parameters
                                if( !modelReuse )
-                                       pstmt.setFrame("X", X);
+                                       pstmt.setFrame("M", M);
+                               pstmt.setFrame("X", X);
                                
                                //execute script
                                ResultVariables rs = pstmt.executeScript();
@@ -185,4 +186,35 @@ public class FrameTransformTest extends AutomatedTestBase
                
                return ret;
        }
+       
+       private String[][] createRecodeMaps(String[][] data) {
+               //create maps per column
+               ArrayList<HashMap<String,Integer>> map = new 
ArrayList<HashMap<String,Integer>>(); 
+               for( int j=0; j<data[0].length; j++ )
+                       map.add(new HashMap<String,Integer>());
+               //create recode maps per column
+               for( int i=0; i<data.length; i++ ) {
+                       for( int j=0; j<data[i].length; j++ )
+                               if( !map.get(j).containsKey(data[i][j]) )
+                                       map.get(j).put(data[i][j], 
map.get(j).size()+1);
+               }
+               //determine max recode map size
+               int max = 0;
+               for( int j=0; j<data[0].length; j++ )
+                       max = Math.max(max, map.get(j).size());
+               
+               //allocate output
+               String[][] ret = new String[max][];
+               for( int i=0; i<max; i++ )
+                       ret[i] = new String[data[0].length];
+               
+               //create frame of recode maps
+               for( int j=0; j<data[0].length; j++) {
+                       int i = 0;
+                       for( Entry<String, Integer> e : map.get(j).entrySet() )
+                               ret[i++][j] = 
e.getKey()+Lop.DATATYPE_PREFIX+e.getValue();
+               }
+               
+               return ret;
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e69a1c26/src/test/scripts/functions/jmlc/transform.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/jmlc/transform.dml 
b/src/test/scripts/functions/jmlc/transform.dml
index 1fce0bb..a7fe77a 100644
--- a/src/test/scripts/functions/jmlc/transform.dml
+++ b/src/test/scripts/functions/jmlc/transform.dml
@@ -19,10 +19,11 @@
 #
 #-------------------------------------------------------------
 
-X = read($X, data_type="frame", format="csv");
+X = read($X, data_type="frame", format="csv"); #new data
+M = read($M, data_type="frame", format="csv"); #existing recode maps
 specJson = $TRANSFORM_SPEC
 
-Xt = transform(target=X, transformPath=$TRANSFORM_PATH, spec=specJson);
+Xt = transformapply(target=X, meta=M, spec=specJson);
 
 V = matrix(Xt, rows=nrow(Xt)*ncol(Xt), cols=1);
 Y = as.matrix(sum(table(V, 1) != 0))


Reply via email to