[SYSTEMML-569] Distributed spark transform apply over frames, tests

This patch introduces the spark transform apply instruction that works
directly over frame inputs and meta data. The functional capabilities
are still limited to recoding though. Furthermore, we generalized the
existing print frame/matrix block functionality and cleaned up the
binary block frame-matrix conversion.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b7582331
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b7582331
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b7582331

Branch: refs/heads/master
Commit: b758233105ae5a859eaee4f0e82b20d832bdcc49
Parents: b5842fb
Author: Matthias Boehm <[email protected]>
Authored: Mon Jun 13 20:41:58 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Jun 13 20:41:58 2016 -0700

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |   2 +-
 .../functionobjects/ParameterizedBuiltin.java   |   6 +-
 .../instructions/SPInstructionParser.java       |   1 +
 .../instructions/spark/CastSPInstruction.java   |   6 +-
 .../ParameterizedBuiltinSPInstruction.java      |  67 ++++++++-
 .../spark/utils/FrameRDDConverterUtils.java     |   8 +-
 .../transform/encode/EncoderFactory.java        |  14 ++
 .../sysml/runtime/util/DataConverter.java       |  18 +++
 .../functions/frame/FrameConverterTest.java     |   5 +-
 .../functions/transform/TransformFrameTest.java | 150 +++++++++++++++++++
 .../scripts/functions/transform/ApplyFrame.dml  |  29 ++++
 .../functions/transform/input/homes3/homes.csv  | 149 ++++++++++++++++++
 .../transform/input/homes3/homes.csv.mtd        |   5 +
 .../input/homes3/homes.tfspec_recode.json       |   2 +
 14 files changed, 449 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index b2a1807..02d3715 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -1120,7 +1120,7 @@ public class ParameterizedBuiltinOp extends Hop 
implements MultiThreadedHop
                }
                
                //force CP for in-memory only transform builtins
-               if( _op == ParamBuiltinOp.TRANSFORMAPPLY
+               if( (_op == ParamBuiltinOp.TRANSFORMAPPLY && 
REMOTE==ExecType.MR)
                        || _op == ParamBuiltinOp.TRANSFORMDECODE
                        || _op == ParamBuiltinOp.TRANSFORMMETA 
                        ||  _op == ParamBuiltinOp.TOSTRING) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
 
b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
index 7c10bca..2bd19fa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
+++ 
b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java
@@ -44,7 +44,7 @@ public class ParameterizedBuiltin extends ValueFunction
 
        private static final long serialVersionUID = -5966242955816522697L;
        
-       public enum ParameterizedBuiltinCode { INVALID, CDF, INVCDF, RMEMPTY, 
REPLACE, REXPAND, TRANSFORM };
+       public enum ParameterizedBuiltinCode { INVALID, CDF, INVCDF, RMEMPTY, 
REPLACE, REXPAND, TRANSFORM, TRANSFORMAPPLY };
        public enum ProbabilityDistributionCode { INVALID, NORMAL, EXP, CHISQ, 
F, T };
        
        public ParameterizedBuiltinCode bFunc;
@@ -60,6 +60,7 @@ public class ParameterizedBuiltin extends ValueFunction
                String2ParameterizedBuiltinCode.put( "replace", 
ParameterizedBuiltinCode.REPLACE);
                String2ParameterizedBuiltinCode.put( "rexpand", 
ParameterizedBuiltinCode.REXPAND);
                String2ParameterizedBuiltinCode.put( "transform", 
ParameterizedBuiltinCode.TRANSFORM);
+               String2ParameterizedBuiltinCode.put( "transformapply", 
ParameterizedBuiltinCode.TRANSFORMAPPLY);
        }
        
        static public HashMap<String, ProbabilityDistributionCode> 
String2DistCode;
@@ -167,6 +168,9 @@ public class ParameterizedBuiltin extends ValueFunction
                        case TRANSFORM:
                                return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORM);
                        
+                       case TRANSFORMAPPLY:
+                               return new 
ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMAPPLY);
+                               
                        default:
                                throw new DMLRuntimeException("Invalid 
parameterized builtin code: " + code);
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 430aaf4..52b712b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -212,6 +212,7 @@ public class SPInstructionParser extends InstructionParser
                String2SPInstructionType.put( "replace"      , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( "rexpand"      , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                String2SPInstructionType.put( "transform"    , 
SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+               String2SPInstructionType.put( 
"transformapply",SPINSTRUCTION_TYPE.ParameterizedBuiltin);
                
                String2SPInstructionType.put( "mappend", 
SPINSTRUCTION_TYPE.MAppend);
                String2SPInstructionType.put( "rappend", 
SPINSTRUCTION_TYPE.RAppend);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
index 3562d18..d869f11 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.UnaryCP;
@@ -29,7 +28,6 @@ import 
org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
@@ -73,12 +71,10 @@ public class CastSPInstruction extends UnarySPInstruction
                
                //convert frame-matrix / matrix-frame and set output
                if( opcode.equals(UnaryCP.CAST_AS_MATRIX_OPCODE) ) {
-                       //TODO: simplify converter api to allow long indexes to 
be passed in
                        MatrixCharacteristics mcOut = new 
MatrixCharacteristics(mcIn);
                        mcOut.setBlockSize(ConfigurationManager.getBlocksize(), 
ConfigurationManager.getBlocksize());
-                       in = ((JavaPairRDD<Long, FrameBlock>)in).mapToPair(new 
LongFrameToLongWritableFrameFunction());
                        out = FrameRDDConverterUtils.binaryBlockToMatrixBlock(
-                               (JavaPairRDD<LongWritable, FrameBlock>)in, 
mcIn, mcOut);
+                                       (JavaPairRDD<Long, FrameBlock>)in, 
mcIn, mcOut);
                }
                else if( opcode.equals(UnaryCP.CAST_AS_FRAME_OPCODE) ) {
                        out = 
FrameRDDConverterUtils.matrixBlockToBinaryBlockLongIndex(sec.getSparkContext(), 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index c09c1fc..ae77c35 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
 
 import scala.Tuple2;
 
@@ -50,9 +51,12 @@ import 
org.apache.sysml.runtime.instructions.spark.functions.ExtractGroupNWeight
 import 
org.apache.sysml.runtime.instructions.spark.functions.PerformGroupByAggInCombiner;
 import 
org.apache.sysml.runtime.instructions.spark.functions.PerformGroupByAggInReducer;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ReplicateVectorFunction;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
@@ -66,6 +70,9 @@ import 
org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTy
 import org.apache.sysml.runtime.matrix.operators.Operator;
 import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
 import org.apache.sysml.runtime.transform.DataTransform;
+import org.apache.sysml.runtime.transform.encode.Encoder;
+import org.apache.sysml.runtime.transform.encode.EncoderFactory;
+import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction 
@@ -156,7 +163,8 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        }
                        else if(   opcode.equalsIgnoreCase("rexpand") 
                                        || opcode.equalsIgnoreCase("replace")
-                                       || opcode.equalsIgnoreCase("transform") 
) 
+                                       || opcode.equalsIgnoreCase("transform")
+                                       || 
opcode.equalsIgnoreCase("transformapply")) 
                        {
                                func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
                                return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, 
opcode, str, false);
@@ -169,6 +177,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
        
 
        @Override 
+       @SuppressWarnings("unchecked")
        public void processInstruction(ExecutionContext ec) 
                throws DMLRuntimeException 
        {
@@ -401,6 +410,36 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                                throw new DMLRuntimeException(e);
                        }
                }
+               else if ( opcode.equalsIgnoreCase("transformapply") ) 
+               {
+                       //get input RDD and meta data
+                       FrameObject fo = 
sec.getFrameObject(params.get("target"));
+                       JavaPairRDD<Long,FrameBlock> in = 
(JavaPairRDD<Long,FrameBlock>)
+                                       sec.getRDDHandleForFrameObject(fo, 
InputInfo.BinaryBlockInputInfo);
+                       FrameBlock meta = 
sec.getFrameInput(params.get("meta"));                
+                       MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(params.get("target"));
+                       MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+                       mcOut.setDimension(mcIn.getRows(), mcIn.getCols()); 
//TODO encoder awareness
+                       
+                       //create encoder broadcast (avoiding replication per 
task) 
+                       Encoder encoder = 
EncoderFactory.createEncoder(params.get("spec"), 
+                                       fo.getSchema(), 
(int)fo.getNumColumns(), meta);
+                       Broadcast<Encoder> bmeta = 
sec.getSparkContext().broadcast(encoder);
+               
+                       //execute transform apply
+                       JavaPairRDD<Long,FrameBlock> tmp = in
+                                       .mapValues(new 
RDDTransformApplyFunction(bmeta));
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
FrameRDDConverterUtils
+                                       .binaryBlockToMatrixBlock(tmp, mcIn, 
mcOut);
+                       
+                       //set output and maintain lineage/output characteristics
+                       sec.setRDDHandleForVariable(output.getName(), out);
+                       sec.addLineageRDD(output.getName(), 
params.get("target"));
+                       ec.releaseFrameInput(params.get("meta"));
+               }
+               else {
+                       throw new DMLRuntimeException("Unknown parameterized 
builtin opcode: "+opcode);
+               }
        }
        
 
@@ -642,6 +681,32 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        return new MatrixCell(val);
                }
        }
+
+       /**
+        * 
+        */
+       public static class RDDTransformApplyFunction implements 
Function<FrameBlock,FrameBlock> 
+       {
+               private static final long serialVersionUID = 
5759813006068230916L;
+               
+               private Broadcast<Encoder> _bencoder = null;
+               
+               public RDDTransformApplyFunction(Broadcast<Encoder> bencoder) {
+                       _bencoder = bencoder;
+               }
+
+               @Override
+               public FrameBlock call(FrameBlock in) 
+                       throws Exception 
+               {
+                       //execute block transform apply
+                       Encoder encoder = _bencoder.getValue();
+                       MatrixBlock tmp = encoder.apply(in, new 
MatrixBlock(in.getNumRows(), in.getNumColumns(), false));
+                       
+                       //convert to frameblock to reuse frame-matrix reblock
+                       return DataConverter.convertToFrameBlock(tmp);
+               }
+       }
        
        /**
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index b2b1a97..274efe0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -282,7 +282,7 @@ public class FrameRDDConverterUtils
         * @param strict
         * @return
         */
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryBlockToMatrixBlock(JavaPairRDD<LongWritable,FrameBlock> input, 
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryBlockToMatrixBlock(JavaPairRDD<Long,FrameBlock> input, 
                        MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut) 
        {
                //convert binary block to matrix block
@@ -726,7 +726,7 @@ public class FrameRDDConverterUtils
        /**
         * 
         */
-       private static class BinaryBlockToMatrixBlockFunction implements 
PairFlatMapFunction<Tuple2<LongWritable,FrameBlock>,MatrixIndexes, MatrixBlock> 
+       private static class BinaryBlockToMatrixBlockFunction implements 
PairFlatMapFunction<Tuple2<Long,FrameBlock>,MatrixIndexes, MatrixBlock> 
        {
                private static final long serialVersionUID = 
-2654986510471835933L;
                
@@ -739,10 +739,10 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<LongWritable, FrameBlock> arg0)
+               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<Long, FrameBlock> arg0)
                        throws Exception 
                {
-                       long rowIndex = arg0._1().get();
+                       long rowIndex = arg0._1();
                        FrameBlock blk = arg0._2();
                        
                        ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index 7d8c00b..75fe639 100644
--- 
a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -44,6 +44,20 @@ public class EncoderFactory
                return createEncoder(spec, Collections.nCopies(clen, 
ValueType.STRING), meta);
        }
        
+       /**
+        * 
+        * @param spec
+        * @param schema
+        * @param clen
+        * @param meta
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static Encoder createEncoder(String spec, List<ValueType> 
schema, int clen, FrameBlock meta) throws DMLRuntimeException {
+               List<ValueType> lschema = (schema==null) ? 
Collections.nCopies(clen, ValueType.STRING) : schema;
+               return createEncoder(spec, lschema, meta);
+       }
+       
        
        /**
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 846b2b2..08a6e8d 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -902,6 +902,15 @@ public class DataConverter
        }
        
        /**
+        * 
+        * @param mb
+        * @return
+        */
+       public static String toString(MatrixBlock mb) {
+               return toString(mb, false, " ", "\n", mb.getNumRows(), 
mb.getNumColumns(), 3);
+       }
+       
+       /**
         * Returns a string representation of a matrix
         * @param mb
         * @param sparse if true, string will contain a table with row index, 
col index, value (where value != 0.0)
@@ -977,6 +986,15 @@ public class DataConverter
        /**
         * 
         * @param fb
+        * @return
+        */
+       public static String toString(FrameBlock fb) {
+               return toString(fb, false, " ", "\n", fb.getNumRows(), 
fb.getNumColumns(), 3);
+       }
+       
+       /**
+        * 
+        * @param fb
         * @param sparse
         * @param separator
         * @param lineseparator

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index e271824..7357989 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -39,6 +39,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongWritableFrameToLongFrameFunction;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.io.FrameWriter;
@@ -483,7 +484,9 @@ public class FrameConverterTest extends AutomatedTestBase
                        case BIN2MAT: {
                                InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
                                OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
-                               JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
+                               JavaPairRDD<Long, FrameBlock> rddIn = sc
+                                               .hadoopFile(fnameIn, 
iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
+                                               .mapToPair(new 
LongWritableFrameToLongFrameFunction());
                                JavaPairRDD<MatrixIndexes,MatrixBlock> rddOut = 
FrameRDDConverterUtils.binaryBlockToMatrixBlock(rddIn, mc, mcMatrix);
                                rddOut.saveAsHadoopFile(fnameOut, 
MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass);
                                break;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
new file mode 100644
index 0000000..e8837cc
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.transform;
+
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.io.MatrixReaderFactory;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class TransformFrameTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME1 = "Transform";
+       private final static String TEST_NAME2 = "ApplyFrame";
+       private final static String TEST_DIR = "functions/transform/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
TransformFrameTest.class.getSimpleName() + "/";
+       
+       private final static String DATASET     = "homes3/homes.csv";
+       private final static String SPEC1               = 
"homes3/homes.tfspec_recode.json"; 
+       private final static String SPEC2               = 
"homes3/homes.tfspec_dummy.json";
+       private final static String SPEC3               = 
"homes3/homes.tfspec_bin.json";
+       private final static String SPEC4               = 
"homes3/homes.tfspec_impute.json";
+       private final static String SPEC5               = 
"homes3/homes.tfspec_omit.json";
+       
+       public enum TransformType {
+               RECODE,
+               DUMMY,
+               BIN,
+               IMPUTE,
+               OMIT,
+       }
+       
+       @Override
+       public void setUp()  {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, 
+                       new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new 
String[] { "y" }) );
+       }
+       
+       @Test
+       public void testHomesRecodeSingleNodeCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", 
TransformType.RECODE);
+       }
+       
+       @Test
+       public void testHomesRecodeSparkCSV() {
+               runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", 
TransformType.RECODE);
+       }
+
+       /**
+        * 
+        * @param rt
+        * @param ofmt
+        * @param dataset
+        */
+       private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt, 
TransformType type )
+       {
+               //set runtime platform
+               RUNTIME_PLATFORM rtold = rtplatform;
+               boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
+               rtplatform = rt;
+
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == 
RUNTIME_PLATFORM.HYBRID_SPARK)
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+               //set transform specification
+               String SPEC = null;
+               switch( type ) {
+                       case RECODE: SPEC = SPEC1; break;
+                       case DUMMY:  SPEC = SPEC2; break;
+                       case BIN:    SPEC = SPEC3; break;
+                       case IMPUTE: SPEC = SPEC4; break;
+                       case OMIT:   SPEC = SPEC5; break;
+               }
+
+               if( !ofmt.equals("csv") )
+                       throw new RuntimeException("Unsupported test output 
format");
+               
+               try
+               {
+                       getAndLoadTestConfiguration(TEST_NAME1);
+       
+                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+                       programArgs = new String[]{"-nvargs", 
+                               "DATA=" + HOME + "input/" + DATASET,
+                               "TFSPEC=" + HOME + "input/" + SPEC,
+                               "TFMTD=" + output("tfmtd"),
+                               "TFDATA=" + output("tfout"),
+                               "OFMT=" + ofmt };
+       
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
+                       runTest(true, false, null, -1); 
+                       
+                       fullDMLScriptName = HOME + TEST_NAME2 + ".dml";
+                       programArgs = new String[]{"-explain","-nvargs", 
+                               "DATA=" + HOME + "input/" + DATASET,
+                               "TFSPEC=" + HOME + "input/" + SPEC,
+                               "APPLYMTD=" + output("tfmtd"),  // generated 
above
+                               "TFDATA=" + output("test_tfout"),
+                               "OFMT=" + ofmt };
+       
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
+                       runTest(true, false, null, -1); 
+                       
+                       //read both outputs and compare
+                       MatrixReader reader1 = 
MatrixReaderFactory.createMatrixReader(InputInfo.CSVInputInfo);
+                       MatrixBlock mb1 = 
reader1.readMatrixFromHDFS(output("tfout"), -1, -1, -1, -1, -1);
+                       MatrixReader reader2 = 
MatrixReaderFactory.createMatrixReader(InputInfo.CSVInputInfo);
+                       MatrixBlock mb2 = 
reader2.readMatrixFromHDFS(output("test_tfout"), -1, -1, -1, -1, -1);
+                       double[][] R1 = 
DataConverter.convertToDoubleMatrix(mb1);
+                       double[][] R2 = 
DataConverter.convertToDoubleMatrix(mb2);
+                       TestUtils.compareMatrices(R1, R2, R1.length, 
R1[0].length, 0);                  
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = rtold;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/ApplyFrame.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/transform/ApplyFrame.dml 
b/src/test/scripts/functions/transform/ApplyFrame.dml
new file mode 100644
index 0000000..4fbcc36
--- /dev/null
+++ b/src/test/scripts/functions/transform/ApplyFrame.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+raw = read($DATA);
+jspec = read($TFSPEC, data_type="scalar", value_type="string")
+
+M = transformmeta(spec=jspec, transformPath=$APPLYMTD);
+A = transformapply(target=raw, spec=jspec, meta=M);
+
+write(A, $TFDATA, format=$OFMT);
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/input/homes3/homes.csv
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/transform/input/homes3/homes.csv 
b/src/test/scripts/functions/transform/input/homes3/homes.csv
new file mode 100644
index 0000000..e4706a3
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.csv
@@ -0,0 +1,149 @@
+zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice
+95141,west,1373,7,1,3,FALSE,695,698
+91312,south,3261,6,2,2,FALSE,902,906
+94555,north,1835,3,3,3,TRUE,888,892
+95141,east,2833,6,2.5,2,TRUE,927,932
+96334,south,2742,6,2.5,2,FALSE,872,876
+96334,north,2195,5,2.5,2,FALSE,799,803
+98755,north,3469,7,2.5,2,FALSE,958,963
+96334,west,1685,7,1.5,2,TRUE,757,760
+95141,west,2238,4,3,3,FALSE,894,899
+91312,west,1245,4,1,1,FALSE,547,549
+98755,south,3702,7,3,1,FALSE,959,964
+98755,north,1865,7,1,2,TRUE,742,745
+94555,north,3837,3,1,1,FALSE,839,842
+91312,west,2139,3,1,3,TRUE,820,824
+95141,north,3824,4,3,1,FALSE,954,958
+98755,east,2858,5,1.5,1,FALSE,759,762
+91312,south,1827,7,3,1,FALSE,735,738
+91312,south,3557,2,2.5,1,FALSE,888,892
+91312,south,2553,2,2.5,2,TRUE,884,889
+96334,west,1682,3,1.5,1,FALSE,625,628
+98755,south,3926,6,2,2,TRUE,1040,1044
+96334,east,2790,5,2.5,3,FALSE,923,927
+95141,west,3928,4,2,3,FALSE,1037,1042
+94555,south,2000,1,3,2,TRUE,844,848
+96334,east,2688,5,2,3,TRUE,938,943
+94555,south,3533,2,1.5,2,FALSE,890,894
+94555,south,2639,3,2,3,FALSE,876,880
+91312,west,2462,4,2.5,1,TRUE,828,832
+94555,north,2420,1,1.5,3,FALSE,809,813
+96334,west,3911,6,1.5,3,TRUE,1072,1077
+96334,west,2166,6,1.5,3,TRUE,867,872
+94555,south,3855,6,2,1,FALSE,914,918
+91312,west,1971,2,3,1,TRUE,790,794
+95141,east,1769,4,1,1,TRUE,652,655
+98755,north,3774,6,1,2,TRUE,961,966
+96334,west,1044,2,3,1,TRUE,685,688
+95141,north,2561,7,1.5,1,TRUE,790,794
+95141,south,2255,2,2,3,TRUE,883,887
+94555,north,3085,6,2,1,FALSE,819,823
+98755,south,1273,2,1.5,2,FALSE,628,631
+91312,west,3785,5,3,3,TRUE,1133,1138
+91312,west,3270,7,1.5,3,FALSE,943,947
+98755,south,1749,2,2,2,FALSE,712,715
+98755,south,1625,7,1.5,2,FALSE,691,694
+96334,north,3010,7,1.5,2,FALSE,848,851
+91312,south,3919,5,1,3,TRUE,1033,1038
+91312,south,1976,1,2,1,TRUE,726,729
+91312,west,3953,3,1.5,3,FALSE,1006,1010
+95141,west,3439,4,2,2,FALSE,921,925
+94555,east,3570,7,1,2,TRUE,934,938
+98755,west,2484,5,3,2,TRUE,926,931
+94555,south,1349,3,2,2,TRUE,721,724
+98755,west,3967,3,2,3,TRUE,1095,1100
+94555,west,2090,7,3,2,TRUE,885,889
+96334,east,1856,5,2,3,TRUE,841,845
+91312,north,1922,2,3,1,TRUE,775,778
+98755,north,1199,3,3,3,FALSE,761,765
+96334,east,2108,3,1,3,TRUE,806,810
+96334,west,3901,4,2,2,FALSE,976,981
+94555,south,2654,6,1.5,2,TRUE,859,863
+94555,west,3805,6,2,3,TRUE,1085,1090
+95141,south,3199,4,2,3,FALSE,947,951
+98755,west,3786,5,1,1,TRUE,909,913
+94555,east,2160,1,1,1,FALSE,629,631
+95141,east,3152,7,2,1,TRUE,883,887
+94555,east,1592,2,3,2,TRUE,791,795
+95141,east,3903,1,2.5,2,FALSE,976,981
+91312,south,1076,2,2.5,1,FALSE,597,600
+96334,west,1719,1,1.5,3,FALSE,738,742
+94555,north,1439,4,1.5,1,FALSE,589,592
+91312,east,1961,2,3,1,TRUE,775,778
+94555,north,2471,1,1.5,1,TRUE,753,756
+91312,west,3930,4,2.5,2,FALSE,1004,1009
+95141,south,2833,1,1,1,FALSE,718,721
+96334,south,2580,4,1,2,TRUE,816,820
+94555,south,2169,3,2.5,3,TRUE,904,908
+95141,east,3329,4,3,3,TRUE,1064,1069
+96334,south,3392,4,2,3,TRUE,1026,1031
+96334,east,3688,6,2.5,3,FALSE,1032,1037
+98755,west,3347,3,2.5,2,TRUE,991,996
+95141,east,1810,5,1,1,FALSE,606,609
+95141,east,3753,1,2.5,2,FALSE,959,963
+94555,east,3906,2,1.5,1,FALSE,866,870
+96334,east,1732,3,2,1,TRUE,700,703
+96334,south,2188,4,2,1,TRUE,767,771
+96334,south,3750,6,2,2,FALSE,963,967
+98755,north,2331,1,1.5,1,TRUE,740,743
+94555,north,1512,4,3,3,TRUE,854,858
+98755,north,3352,3,3,3,FALSE,1014,1018
+94555,south,3426,3,2.5,2,FALSE,937,941
+98755,south,3211,5,3,1,TRUE,948,953
+98755,west,2747,2,2.5,1,FALSE,803,806
+96334,east,3952,6,1.5,1,TRUE,946,950
+91312,north,3814,6,1.5,2,FALSE,934,938
+95141,south,3700,7,2.5,1,FALSE,929,933
+95141,east,3154,4,2.5,1,TRUE,898,902
+91312,south,2648,4,1.5,2,FALSE,793,797
+98755,north,1394,4,1.5,1,FALSE,587,590
+91312,west,2709,5,2,2,FALSE,837,841
+94555,east,3946,6,1,2,TRUE,974,978
+91312,north,3905,6,2,2,FALSE,973,977
+98755,east,3248,5,1.5,1,TRUE,860,864
+96334,north,1774,7,1.5,1,FALSE,644,647
+94555,east,1995,2,3,3,TRUE,897,902
+94555,east,2876,2,3,1,FALSE,828,832
+94555,east,3229,4,2,3,TRUE,995,1000
+94555,north,1079,5,2,2,FALSE,638,641
+95141,south,3695,7,2.5,3,FALSE,1046,1051
+96334,west,3694,5,1,1,TRUE,897,901
+98755,west,1918,5,1,2,FALSE,693,697
+94555,south,1647,6,1,2,TRUE,713,716
+96334,west,2691,3,2.5,2,FALSE,858,862
+95141,south,1333,2,2,2,TRUE,716,719
+95141,west,2609,4,2,1,FALSE,765,768
+91312,west,2125,3,1,2,TRUE,760,763
+91312,west,2417,5,1,1,FALSE,689,692
+98755,west,3623,2,1,3,TRUE,995,999
+98755,north,3343,6,3,1,FALSE,908,912
+96334,south,1074,7,2.5,3,FALSE,739,743
+96334,south,2972,3,1,2,TRUE,858,862
+91312,east,1637,2,2,1,FALSE,626,629
+91312,north,1807,2,3,2,FALSE,765,768
+95141,north,1457,2,3,1,FALSE,667,670
+91312,west,3043,6,1,1,FALSE,766,770
+91312,west,3045,6,1.5,3,TRUE,967,972
+98755,north,1980,5,1,1,TRUE,688,691
+98755,west,1112,3,1.5,3,TRUE,732,735
+98755,south,1533,6,1.5,3,FALSE,734,738
+91312,east,1442,5,2,2,FALSE,675,678
+91312,north,3171,6,1,3,TRUE,945,949
+96334,east,3072,5,1.5,2,FALSE,842,846
+94555,east,3506,4,1.5,3,TRUE,1000,1005
+94555,south,1574,2,1,3,FALSE,691,694
+94555,east,3567,6,3,1,FALSE,926,931
+91312,south,1194,1,1,2,TRUE,637,640
+94555,east,1031,3,1.5,1,FALSE,532,535
+94555,south,2776,3,2.5,2,TRUE,916,920
+91312,south,2009,5,1.5,1,TRUE,719,723
+96334,north,3784,2,1,2,FALSE,889,893
+94555,west,1975,6,1.5,2,FALSE,729,732
+98755,west,2444,2,3,2,FALSE,854,857
+95141,south,1684,3,1.5,3,FALSE,737,740
+98755,north,1729,6,1,1,TRUE,663,666
+95141,north,2236,1,1,2,FALSE,702,705
+95141,south,2061,7,3,1,FALSE,764,768
+98755,south,3561,3,2.5,3,TRUE,1070,1075
+94555,east,2143,3,1,2,FALSE,694,697
+96334,north,3840,7,1,1,FALSE,858,862

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd 
b/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd
new file mode 100644
index 0000000..c47626a
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd
@@ -0,0 +1,5 @@
+{  
+    "data_type": "frame",  
+    "format": "csv",  
+    "header": true,  
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json
----------------------------------------------------------------------
diff --git 
a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json 
b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json
new file mode 100644
index 0000000..c8aa0ea
--- /dev/null
+++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json
@@ -0,0 +1,2 @@
+{
+ "ids": true, "recode": [ 1, 2, 7 ] }
\ No newline at end of file

Reply via email to