[SYSTEMML-569] Distributed spark transform apply over frames, tests This patch introduces the spark transform apply instruction that works directly over frame inputs and meta data. The functional capabilities are still limited to recoding though. Furthermore, we generalized the existing print frame/matrix block functionality and cleaned up the binary block frame-matrix conversion.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b7582331 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b7582331 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b7582331 Branch: refs/heads/master Commit: b758233105ae5a859eaee4f0e82b20d832bdcc49 Parents: b5842fb Author: Matthias Boehm <[email protected]> Authored: Mon Jun 13 20:41:58 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Jun 13 20:41:58 2016 -0700 ---------------------------------------------------------------------- .../sysml/hops/ParameterizedBuiltinOp.java | 2 +- .../functionobjects/ParameterizedBuiltin.java | 6 +- .../instructions/SPInstructionParser.java | 1 + .../instructions/spark/CastSPInstruction.java | 6 +- .../ParameterizedBuiltinSPInstruction.java | 67 ++++++++- .../spark/utils/FrameRDDConverterUtils.java | 8 +- .../transform/encode/EncoderFactory.java | 14 ++ .../sysml/runtime/util/DataConverter.java | 18 +++ .../functions/frame/FrameConverterTest.java | 5 +- .../functions/transform/TransformFrameTest.java | 150 +++++++++++++++++++ .../scripts/functions/transform/ApplyFrame.dml | 29 ++++ .../functions/transform/input/homes3/homes.csv | 149 ++++++++++++++++++ .../transform/input/homes3/homes.csv.mtd | 5 + .../input/homes3/homes.tfspec_recode.json | 2 + 14 files changed, 449 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java index b2a1807..02d3715 100644 --- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java +++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java @@ -1120,7 +1120,7 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop } //force CP for in-memory only transform builtins - if( _op == ParamBuiltinOp.TRANSFORMAPPLY + if( (_op == ParamBuiltinOp.TRANSFORMAPPLY && REMOTE==ExecType.MR) || _op == ParamBuiltinOp.TRANSFORMDECODE || _op == ParamBuiltinOp.TRANSFORMMETA || _op == ParamBuiltinOp.TOSTRING) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java index 7c10bca..2bd19fa 100644 --- a/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java +++ b/src/main/java/org/apache/sysml/runtime/functionobjects/ParameterizedBuiltin.java @@ -44,7 +44,7 @@ public class ParameterizedBuiltin extends ValueFunction private static final long serialVersionUID = -5966242955816522697L; - public enum ParameterizedBuiltinCode { INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, TRANSFORM }; + public enum ParameterizedBuiltinCode { INVALID, CDF, INVCDF, RMEMPTY, REPLACE, REXPAND, TRANSFORM, TRANSFORMAPPLY }; public enum ProbabilityDistributionCode { INVALID, NORMAL, EXP, CHISQ, F, T }; public ParameterizedBuiltinCode bFunc; @@ -60,6 +60,7 @@ public class ParameterizedBuiltin extends ValueFunction String2ParameterizedBuiltinCode.put( "replace", ParameterizedBuiltinCode.REPLACE); String2ParameterizedBuiltinCode.put( "rexpand", ParameterizedBuiltinCode.REXPAND); String2ParameterizedBuiltinCode.put( "transform", ParameterizedBuiltinCode.TRANSFORM); + String2ParameterizedBuiltinCode.put( "transformapply", ParameterizedBuiltinCode.TRANSFORMAPPLY); } static public HashMap<String, ProbabilityDistributionCode> String2DistCode; @@ -167,6 +168,9 @@ public class ParameterizedBuiltin extends ValueFunction case TRANSFORM: return new ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORM); + case TRANSFORMAPPLY: + return new ParameterizedBuiltin(ParameterizedBuiltinCode.TRANSFORMAPPLY); + default: throw new DMLRuntimeException("Invalid parameterized builtin code: " + code); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java index 430aaf4..52b712b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java @@ -212,6 +212,7 @@ public class SPInstructionParser extends InstructionParser String2SPInstructionType.put( "replace" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "rexpand" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "transform" , SPINSTRUCTION_TYPE.ParameterizedBuiltin); + String2SPInstructionType.put( "transformapply",SPINSTRUCTION_TYPE.ParameterizedBuiltin); String2SPInstructionType.put( "mappend", SPINSTRUCTION_TYPE.MAppend); String2SPInstructionType.put( "rappend", SPINSTRUCTION_TYPE.RAppend); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java index 3562d18..d869f11 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java @@ -19,7 +19,6 @@ package org.apache.sysml.runtime.instructions.spark; -import org.apache.hadoop.io.LongWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.lops.UnaryCP; @@ -29,7 +28,6 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; -import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.FrameBlock; @@ -73,12 +71,10 @@ public class CastSPInstruction extends UnarySPInstruction //convert frame-matrix / matrix-frame and set output if( opcode.equals(UnaryCP.CAST_AS_MATRIX_OPCODE) ) { - //TODO: simplify converter api to allow long indexes to be passed in MatrixCharacteristics mcOut = new MatrixCharacteristics(mcIn); mcOut.setBlockSize(ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize()); - in = ((JavaPairRDD<Long, FrameBlock>)in).mapToPair(new LongFrameToLongWritableFrameFunction()); out = FrameRDDConverterUtils.binaryBlockToMatrixBlock( - (JavaPairRDD<LongWritable, FrameBlock>)in, mcIn, mcOut); + (JavaPairRDD<Long, FrameBlock>)in, mcIn, mcOut); } else if( opcode.equals(UnaryCP.CAST_AS_FRAME_OPCODE) ) { out = FrameRDDConverterUtils.matrixBlockToBinaryBlockLongIndex(sec.getSparkContext(), http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index c09c1fc..ae77c35 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -25,6 +25,7 @@ import java.util.HashMap; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; @@ -50,9 +51,12 @@ import org.apache.sysml.runtime.instructions.spark.functions.ExtractGroupNWeight import org.apache.sysml.runtime.instructions.spark.functions.PerformGroupByAggInCombiner; import org.apache.sysml.runtime.instructions.spark.functions.PerformGroupByAggInReducer; import org.apache.sysml.runtime.instructions.spark.functions.ReplicateVectorFunction; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.LibMatrixReorg; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; @@ -66,6 +70,9 @@ import org.apache.sysml.runtime.matrix.operators.CMOperator.AggregateOperationTy import org.apache.sysml.runtime.matrix.operators.Operator; import org.apache.sysml.runtime.matrix.operators.SimpleOperator; import org.apache.sysml.runtime.transform.DataTransform; +import org.apache.sysml.runtime.transform.encode.Encoder; +import org.apache.sysml.runtime.transform.encode.EncoderFactory; +import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.UtilFunctions; public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction @@ -156,7 +163,8 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } else if( opcode.equalsIgnoreCase("rexpand") || opcode.equalsIgnoreCase("replace") - || opcode.equalsIgnoreCase("transform") ) + || opcode.equalsIgnoreCase("transform") + || opcode.equalsIgnoreCase("transformapply")) { func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode); return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false); @@ -169,6 +177,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction @Override + @SuppressWarnings("unchecked") public void processInstruction(ExecutionContext ec) throws DMLRuntimeException { @@ -401,6 +410,36 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction throw new DMLRuntimeException(e); } } + else if ( opcode.equalsIgnoreCase("transformapply") ) + { + //get input RDD and meta data + FrameObject fo = sec.getFrameObject(params.get("target")); + JavaPairRDD<Long,FrameBlock> in = (JavaPairRDD<Long,FrameBlock>) + sec.getRDDHandleForFrameObject(fo, InputInfo.BinaryBlockInputInfo); + FrameBlock meta = sec.getFrameInput(params.get("meta")); + MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(params.get("target")); + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + mcOut.setDimension(mcIn.getRows(), mcIn.getCols()); //TODO encoder awareness + + //create encoder broadcast (avoiding replication per task) + Encoder encoder = EncoderFactory.createEncoder(params.get("spec"), + fo.getSchema(), (int)fo.getNumColumns(), meta); + Broadcast<Encoder> bmeta = sec.getSparkContext().broadcast(encoder); + + //execute transform apply + JavaPairRDD<Long,FrameBlock> tmp = in + .mapValues(new RDDTransformApplyFunction(bmeta)); + JavaPairRDD<MatrixIndexes,MatrixBlock> out = FrameRDDConverterUtils + .binaryBlockToMatrixBlock(tmp, mcIn, mcOut); + + //set output and maintain lineage/output characteristics + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), params.get("target")); + ec.releaseFrameInput(params.get("meta")); + } + else { + throw new DMLRuntimeException("Unknown parameterized builtin opcode: "+opcode); + } } @@ -642,6 +681,32 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction return new MatrixCell(val); } } + + /** + * + */ + public static class RDDTransformApplyFunction implements Function<FrameBlock,FrameBlock> + { + private static final long serialVersionUID = 5759813006068230916L; + + private Broadcast<Encoder> _bencoder = null; + + public RDDTransformApplyFunction(Broadcast<Encoder> bencoder) { + _bencoder = bencoder; + } + + @Override + public FrameBlock call(FrameBlock in) + throws Exception + { + //execute block transform apply + Encoder encoder = _bencoder.getValue(); + MatrixBlock tmp = encoder.apply(in, new MatrixBlock(in.getNumRows(), in.getNumColumns(), false)); + + //convert to frameblock to reuse frame-matrix reblock + return DataConverter.convertToFrameBlock(tmp); + } + } /** * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index b2b1a97..274efe0 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -282,7 +282,7 @@ public class FrameRDDConverterUtils * @param strict * @return */ - public static JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockToMatrixBlock(JavaPairRDD<LongWritable,FrameBlock> input, + public static JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockToMatrixBlock(JavaPairRDD<Long,FrameBlock> input, MatrixCharacteristics mcIn, MatrixCharacteristics mcOut) { //convert binary block to matrix block @@ -726,7 +726,7 @@ public class FrameRDDConverterUtils /** * */ - private static class BinaryBlockToMatrixBlockFunction implements PairFlatMapFunction<Tuple2<LongWritable,FrameBlock>,MatrixIndexes, MatrixBlock> + private static class BinaryBlockToMatrixBlockFunction implements PairFlatMapFunction<Tuple2<Long,FrameBlock>,MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = -2654986510471835933L; @@ -739,10 +739,10 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<LongWritable, FrameBlock> arg0) + public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<Long, FrameBlock> arg0) throws Exception { - long rowIndex = arg0._1().get(); + long rowIndex = arg0._1(); FrameBlock blk = arg0._2(); ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java index 7d8c00b..75fe639 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java @@ -44,6 +44,20 @@ public class EncoderFactory return createEncoder(spec, Collections.nCopies(clen, ValueType.STRING), meta); } + /** + * + * @param spec + * @param schema + * @param clen + * @param meta + * @return + * @throws DMLRuntimeException + */ + public static Encoder createEncoder(String spec, List<ValueType> schema, int clen, FrameBlock meta) throws DMLRuntimeException { + List<ValueType> lschema = (schema==null) ? Collections.nCopies(clen, ValueType.STRING) : schema; + return createEncoder(spec, lschema, meta); + } + /** * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index 846b2b2..08a6e8d 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -902,6 +902,15 @@ public class DataConverter } /** + * + * @param mb + * @return + */ + public static String toString(MatrixBlock mb) { + return toString(mb, false, " ", "\n", mb.getNumRows(), mb.getNumColumns(), 3); + } + + /** * Returns a string representation of a matrix * @param mb * @param sparse if true, string will contain a table with row index, col index, value (where value != 0.0) @@ -977,6 +986,15 @@ public class DataConverter /** * * @param fb + * @return + */ + public static String toString(FrameBlock fb) { + return toString(fb, false, " ", "\n", fb.getNumRows(), fb.getNumColumns(), 3); + } + + /** + * + * @param fb * @param sparse * @param separator * @param lineseparator http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index e271824..7357989 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -39,6 +39,7 @@ import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongWritableFrameToLongFrameFunction; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.io.FrameWriter; @@ -483,7 +484,9 @@ public class FrameConverterTest extends AutomatedTestBase case BIN2MAT: { InputInfo iinfo = InputInfo.BinaryBlockInputInfo; OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; - JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); + JavaPairRDD<Long, FrameBlock> rddIn = sc + .hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class) + .mapToPair(new LongWritableFrameToLongFrameFunction()); JavaPairRDD<MatrixIndexes,MatrixBlock> rddOut = FrameRDDConverterUtils.binaryBlockToMatrixBlock(rddIn, mc, mcMatrix); rddOut.saveAsHadoopFile(fnameOut, MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass); break; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java new file mode 100644 index 0000000..e8837cc --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.transform; + +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.io.MatrixReader; +import org.apache.sysml.runtime.io.MatrixReaderFactory; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class TransformFrameTest extends AutomatedTestBase +{ + private final static String TEST_NAME1 = "Transform"; + private final static String TEST_NAME2 = "ApplyFrame"; + private final static String TEST_DIR = "functions/transform/"; + private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameTest.class.getSimpleName() + "/"; + + private final static String DATASET = "homes3/homes.csv"; + private final static String SPEC1 = "homes3/homes.tfspec_recode.json"; + private final static String SPEC2 = "homes3/homes.tfspec_dummy.json"; + private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; + private final static String SPEC4 = "homes3/homes.tfspec_impute.json"; + private final static String SPEC5 = "homes3/homes.tfspec_omit.json"; + + public enum TransformType { + RECODE, + DUMMY, + BIN, + IMPUTE, + OMIT, + } + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, + new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "y" }) ); + } + + @Test + public void testHomesRecodeSingleNodeCSV() { + runTransformTest(RUNTIME_PLATFORM.SINGLE_NODE, "csv", TransformType.RECODE); + } + + @Test + public void testHomesRecodeSparkCSV() { + runTransformTest(RUNTIME_PLATFORM.SPARK, "csv", TransformType.RECODE); + } + + /** + * + * @param rt + * @param ofmt + * @param dataset + */ + private void runTransformTest( RUNTIME_PLATFORM rt, String ofmt, TransformType type ) + { + //set runtime platform + RUNTIME_PLATFORM rtold = rtplatform; + boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; + rtplatform = rt; + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + //set transform specification + String SPEC = null; + switch( type ) { + case RECODE: SPEC = SPEC1; break; + case DUMMY: SPEC = SPEC2; break; + case BIN: SPEC = SPEC3; break; + case IMPUTE: SPEC = SPEC4; break; + case OMIT: SPEC = SPEC5; break; + } + + if( !ofmt.equals("csv") ) + throw new RuntimeException("Unsupported test output format"); + + try + { + getAndLoadTestConfiguration(TEST_NAME1); + + /* This is for running the junit test the new way, i.e., construct the arguments directly */ + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME1 + ".dml"; + programArgs = new String[]{"-nvargs", + "DATA=" + HOME + "input/" + DATASET, + "TFSPEC=" + HOME + "input/" + SPEC, + "TFMTD=" + output("tfmtd"), + "TFDATA=" + output("tfout"), + "OFMT=" + ofmt }; + + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; + runTest(true, false, null, -1); + + fullDMLScriptName = HOME + TEST_NAME2 + ".dml"; + programArgs = new String[]{"-explain","-nvargs", + "DATA=" + HOME + "input/" + DATASET, + "TFSPEC=" + HOME + "input/" + SPEC, + "APPLYMTD=" + output("tfmtd"), // generated above + "TFDATA=" + output("test_tfout"), + "OFMT=" + ofmt }; + + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; + runTest(true, false, null, -1); + + //read both outputs and compare + MatrixReader reader1 = MatrixReaderFactory.createMatrixReader(InputInfo.CSVInputInfo); + MatrixBlock mb1 = reader1.readMatrixFromHDFS(output("tfout"), -1, -1, -1, -1, -1); + MatrixReader reader2 = MatrixReaderFactory.createMatrixReader(InputInfo.CSVInputInfo); + MatrixBlock mb2 = reader2.readMatrixFromHDFS(output("test_tfout"), -1, -1, -1, -1, -1); + double[][] R1 = DataConverter.convertToDoubleMatrix(mb1); + double[][] R2 = DataConverter.convertToDoubleMatrix(mb2); + TestUtils.compareMatrices(R1, R2, R1.length, R1[0].length, 0); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { + rtplatform = rtold; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/ApplyFrame.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/ApplyFrame.dml b/src/test/scripts/functions/transform/ApplyFrame.dml new file mode 100644 index 0000000..4fbcc36 --- /dev/null +++ b/src/test/scripts/functions/transform/ApplyFrame.dml @@ -0,0 +1,29 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +raw = read($DATA); +jspec = read($TFSPEC, data_type="scalar", value_type="string") + +M = transformmeta(spec=jspec, transformPath=$APPLYMTD); +A = transformapply(target=raw, spec=jspec, meta=M); + +write(A, $TFDATA, format=$OFMT); + http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/input/homes3/homes.csv ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.csv b/src/test/scripts/functions/transform/input/homes3/homes.csv new file mode 100644 index 0000000..e4706a3 --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.csv @@ -0,0 +1,149 @@ +zipcode,district,sqft,numbedrooms,numbathrooms,floors,view,saleprice,askingprice +95141,west,1373,7,1,3,FALSE,695,698 +91312,south,3261,6,2,2,FALSE,902,906 +94555,north,1835,3,3,3,TRUE,888,892 +95141,east,2833,6,2.5,2,TRUE,927,932 +96334,south,2742,6,2.5,2,FALSE,872,876 +96334,north,2195,5,2.5,2,FALSE,799,803 +98755,north,3469,7,2.5,2,FALSE,958,963 +96334,west,1685,7,1.5,2,TRUE,757,760 +95141,west,2238,4,3,3,FALSE,894,899 +91312,west,1245,4,1,1,FALSE,547,549 +98755,south,3702,7,3,1,FALSE,959,964 +98755,north,1865,7,1,2,TRUE,742,745 +94555,north,3837,3,1,1,FALSE,839,842 +91312,west,2139,3,1,3,TRUE,820,824 +95141,north,3824,4,3,1,FALSE,954,958 +98755,east,2858,5,1.5,1,FALSE,759,762 +91312,south,1827,7,3,1,FALSE,735,738 +91312,south,3557,2,2.5,1,FALSE,888,892 +91312,south,2553,2,2.5,2,TRUE,884,889 +96334,west,1682,3,1.5,1,FALSE,625,628 +98755,south,3926,6,2,2,TRUE,1040,1044 +96334,east,2790,5,2.5,3,FALSE,923,927 +95141,west,3928,4,2,3,FALSE,1037,1042 +94555,south,2000,1,3,2,TRUE,844,848 +96334,east,2688,5,2,3,TRUE,938,943 +94555,south,3533,2,1.5,2,FALSE,890,894 +94555,south,2639,3,2,3,FALSE,876,880 +91312,west,2462,4,2.5,1,TRUE,828,832 +94555,north,2420,1,1.5,3,FALSE,809,813 +96334,west,3911,6,1.5,3,TRUE,1072,1077 +96334,west,2166,6,1.5,3,TRUE,867,872 +94555,south,3855,6,2,1,FALSE,914,918 +91312,west,1971,2,3,1,TRUE,790,794 +95141,east,1769,4,1,1,TRUE,652,655 +98755,north,3774,6,1,2,TRUE,961,966 +96334,west,1044,2,3,1,TRUE,685,688 +95141,north,2561,7,1.5,1,TRUE,790,794 +95141,south,2255,2,2,3,TRUE,883,887 +94555,north,3085,6,2,1,FALSE,819,823 +98755,south,1273,2,1.5,2,FALSE,628,631 +91312,west,3785,5,3,3,TRUE,1133,1138 +91312,west,3270,7,1.5,3,FALSE,943,947 +98755,south,1749,2,2,2,FALSE,712,715 +98755,south,1625,7,1.5,2,FALSE,691,694 +96334,north,3010,7,1.5,2,FALSE,848,851 +91312,south,3919,5,1,3,TRUE,1033,1038 +91312,south,1976,1,2,1,TRUE,726,729 +91312,west,3953,3,1.5,3,FALSE,1006,1010 +95141,west,3439,4,2,2,FALSE,921,925 +94555,east,3570,7,1,2,TRUE,934,938 +98755,west,2484,5,3,2,TRUE,926,931 +94555,south,1349,3,2,2,TRUE,721,724 +98755,west,3967,3,2,3,TRUE,1095,1100 +94555,west,2090,7,3,2,TRUE,885,889 +96334,east,1856,5,2,3,TRUE,841,845 +91312,north,1922,2,3,1,TRUE,775,778 +98755,north,1199,3,3,3,FALSE,761,765 +96334,east,2108,3,1,3,TRUE,806,810 +96334,west,3901,4,2,2,FALSE,976,981 +94555,south,2654,6,1.5,2,TRUE,859,863 +94555,west,3805,6,2,3,TRUE,1085,1090 +95141,south,3199,4,2,3,FALSE,947,951 +98755,west,3786,5,1,1,TRUE,909,913 +94555,east,2160,1,1,1,FALSE,629,631 +95141,east,3152,7,2,1,TRUE,883,887 +94555,east,1592,2,3,2,TRUE,791,795 +95141,east,3903,1,2.5,2,FALSE,976,981 +91312,south,1076,2,2.5,1,FALSE,597,600 +96334,west,1719,1,1.5,3,FALSE,738,742 +94555,north,1439,4,1.5,1,FALSE,589,592 +91312,east,1961,2,3,1,TRUE,775,778 +94555,north,2471,1,1.5,1,TRUE,753,756 +91312,west,3930,4,2.5,2,FALSE,1004,1009 +95141,south,2833,1,1,1,FALSE,718,721 +96334,south,2580,4,1,2,TRUE,816,820 +94555,south,2169,3,2.5,3,TRUE,904,908 +95141,east,3329,4,3,3,TRUE,1064,1069 +96334,south,3392,4,2,3,TRUE,1026,1031 +96334,east,3688,6,2.5,3,FALSE,1032,1037 +98755,west,3347,3,2.5,2,TRUE,991,996 +95141,east,1810,5,1,1,FALSE,606,609 +95141,east,3753,1,2.5,2,FALSE,959,963 +94555,east,3906,2,1.5,1,FALSE,866,870 +96334,east,1732,3,2,1,TRUE,700,703 +96334,south,2188,4,2,1,TRUE,767,771 +96334,south,3750,6,2,2,FALSE,963,967 +98755,north,2331,1,1.5,1,TRUE,740,743 +94555,north,1512,4,3,3,TRUE,854,858 +98755,north,3352,3,3,3,FALSE,1014,1018 +94555,south,3426,3,2.5,2,FALSE,937,941 +98755,south,3211,5,3,1,TRUE,948,953 +98755,west,2747,2,2.5,1,FALSE,803,806 +96334,east,3952,6,1.5,1,TRUE,946,950 +91312,north,3814,6,1.5,2,FALSE,934,938 +95141,south,3700,7,2.5,1,FALSE,929,933 +95141,east,3154,4,2.5,1,TRUE,898,902 +91312,south,2648,4,1.5,2,FALSE,793,797 +98755,north,1394,4,1.5,1,FALSE,587,590 +91312,west,2709,5,2,2,FALSE,837,841 +94555,east,3946,6,1,2,TRUE,974,978 +91312,north,3905,6,2,2,FALSE,973,977 +98755,east,3248,5,1.5,1,TRUE,860,864 +96334,north,1774,7,1.5,1,FALSE,644,647 +94555,east,1995,2,3,3,TRUE,897,902 +94555,east,2876,2,3,1,FALSE,828,832 +94555,east,3229,4,2,3,TRUE,995,1000 +94555,north,1079,5,2,2,FALSE,638,641 +95141,south,3695,7,2.5,3,FALSE,1046,1051 +96334,west,3694,5,1,1,TRUE,897,901 +98755,west,1918,5,1,2,FALSE,693,697 +94555,south,1647,6,1,2,TRUE,713,716 +96334,west,2691,3,2.5,2,FALSE,858,862 +95141,south,1333,2,2,2,TRUE,716,719 +95141,west,2609,4,2,1,FALSE,765,768 +91312,west,2125,3,1,2,TRUE,760,763 +91312,west,2417,5,1,1,FALSE,689,692 +98755,west,3623,2,1,3,TRUE,995,999 +98755,north,3343,6,3,1,FALSE,908,912 +96334,south,1074,7,2.5,3,FALSE,739,743 +96334,south,2972,3,1,2,TRUE,858,862 +91312,east,1637,2,2,1,FALSE,626,629 +91312,north,1807,2,3,2,FALSE,765,768 +95141,north,1457,2,3,1,FALSE,667,670 +91312,west,3043,6,1,1,FALSE,766,770 +91312,west,3045,6,1.5,3,TRUE,967,972 +98755,north,1980,5,1,1,TRUE,688,691 +98755,west,1112,3,1.5,3,TRUE,732,735 +98755,south,1533,6,1.5,3,FALSE,734,738 +91312,east,1442,5,2,2,FALSE,675,678 +91312,north,3171,6,1,3,TRUE,945,949 +96334,east,3072,5,1.5,2,FALSE,842,846 +94555,east,3506,4,1.5,3,TRUE,1000,1005 +94555,south,1574,2,1,3,FALSE,691,694 +94555,east,3567,6,3,1,FALSE,926,931 +91312,south,1194,1,1,2,TRUE,637,640 +94555,east,1031,3,1.5,1,FALSE,532,535 +94555,south,2776,3,2.5,2,TRUE,916,920 +91312,south,2009,5,1.5,1,TRUE,719,723 +96334,north,3784,2,1,2,FALSE,889,893 +94555,west,1975,6,1.5,2,FALSE,729,732 +98755,west,2444,2,3,2,FALSE,854,857 +95141,south,1684,3,1.5,3,FALSE,737,740 +98755,north,1729,6,1,1,TRUE,663,666 +95141,north,2236,1,1,2,FALSE,702,705 +95141,south,2061,7,3,1,FALSE,764,768 +98755,south,3561,3,2.5,3,TRUE,1070,1075 +94555,east,2143,3,1,2,FALSE,694,697 +96334,north,3840,7,1,1,FALSE,858,862 http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd b/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd new file mode 100644 index 0000000..c47626a --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.csv.mtd @@ -0,0 +1,5 @@ +{ + "data_type": "frame", + "format": "csv", + "header": true, +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b7582331/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json new file mode 100644 index 0000000..c8aa0ea --- /dev/null +++ b/src/test/scripts/functions/transform/input/homes3/homes.tfspec_recode.json @@ -0,0 +1,2 @@ +{ + "ids": true, "recode": [ 1, 2, 7 ] } \ No newline at end of file
