This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 158ccff  [SYSTEMDS-351] New builtin dropInvalid for cleaning by 
expected schema
158ccff is described below

commit 158ccffbadef845058d9f3a2c5084fbb8fa00429
Author: Shafaq Siddiqi <[email protected]>
AuthorDate: Fri Apr 10 20:44:33 2020 +0200

    [SYSTEMDS-351] New builtin dropInvalid for cleaning by expected schema
    
    Closes #883.
---
 docs/Tasks.txt                                     |   3 +
 .../java/org/apache/sysds/common/Builtins.java     |   1 +
 src/main/java/org/apache/sysds/hops/BinaryOp.java  |   3 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |   5 +-
 src/main/java/org/apache/sysds/lops/Binary.java    |  11 +-
 src/main/java/org/apache/sysds/lops/BinaryM.java   |   4 +-
 .../sysds/parser/BuiltinFunctionExpression.java    |  10 +
 .../org/apache/sysds/parser/DMLTranslator.java     |  21 ++-
 .../sysds/runtime/functionobjects/Builtin.java     |   3 +-
 .../runtime/instructions/CPInstructionParser.java  |   5 +-
 .../runtime/instructions/InstructionUtils.java     |   3 +
 .../runtime/instructions/SPInstructionParser.java  |  10 +-
 .../instructions/cp/BinaryCPInstruction.java       |   2 +
 .../cp/BinaryFrameFrameCPInstruction.java          |  47 +++++
 .../runtime/instructions/cp/CPInstruction.java     |   2 +-
 .../spark/BinaryFrameFrameSPInstruction.java       |  84 +++++++++
 .../instructions/spark/BinarySPInstruction.java    |  42 +++--
 .../sysds/runtime/matrix/data/FrameBlock.java      |  36 ++++
 .../apache/sysds/runtime/util/UtilFunctions.java   |   4 +-
 .../functions/frame/FrameIsCorrectTypeTest.java    | 206 +++++++++++++++++++++
 src/test/scripts/functions/frame/DropInvalid.dml   |  25 +++
 21 files changed, 480 insertions(+), 47 deletions(-)

diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index d1168e6..5fdd96d 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -250,5 +250,8 @@ SYSTEMDS-340 Compiler Assisted Lineage Caching and Reuse
  * 344 Unmark functions/SBs containing non-determinism for caching
  * 345 Compiler assisted cache configuration
 
+SYSTEMDS-350 Data Cleaning Framework
+ * 351 New builtin function for error correction by schema            OK
+
 Others:
  * Break append instruction to cbind and rbind 
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java 
b/src/main/java/org/apache/sysds/common/Builtins.java
index 7220198..4f20d87 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -83,6 +83,7 @@ public enum Builtins {
        DETECTSCHEMA("detectSchema", false),
        DIAG("diag", false),
        DISCOVER_FD("discoverFD", true),
+       DROP_INVALID("dropInvalid", false),
        EIGEN("eigen", false, ReturnType.MULTI_RETURN),
        EXISTS("exists", false),
        EXP("exp", false),
diff --git a/src/main/java/org/apache/sysds/hops/BinaryOp.java 
b/src/main/java/org/apache/sysds/hops/BinaryOp.java
index 769329b..8212b53 100644
--- a/src/main/java/org/apache/sysds/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/BinaryOp.java
@@ -894,7 +894,8 @@ public class BinaryOp extends MultiThreadedHop
 
        private static MMBinaryMethod optFindMMBinaryMethodSpark(Hop left, Hop 
right) {
                // TODO size information for tensor
-               if (left._dataType == DataType.TENSOR && right._dataType == 
DataType.TENSOR)
+               if ((left._dataType == DataType.TENSOR && right._dataType == 
DataType.TENSOR)
+                       || (left._dataType == DataType.FRAME && right._dataType 
== DataType.FRAME))
                        return MMBinaryMethod.MR_BINARY_R;
                long m1_dim1 = left.getDim1();
                long m1_dim2 = left.getDim2();
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java 
b/src/main/java/org/apache/sysds/hops/Hop.java
index 01c61c9..f64b5f0 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -1057,6 +1057,7 @@ public abstract class Hop implements ParseInfo
                LOG_NZ, //sparse-safe log; ppred(X,0,"!=")*log(X,0.5)
                MINUS1_MULT, //1-X*Y
                BITWAND, BITWOR, BITWXOR, BITWSHIFTL, BITWSHIFTR, //bitwise 
operations
+               DROP_INVALID, // frame operation for removing cells invalid wrt 
given data type
        }
 
        public static final HashMap<Hop.OpOp2, Binary.OperationTypes> 
HopsOpOp2LopsB;
@@ -1088,6 +1089,7 @@ public abstract class Hop implements ParseInfo
                HopsOpOp2LopsB.put(OpOp2.BITWXOR, Binary.OperationTypes.BW_XOR);
                HopsOpOp2LopsB.put(OpOp2.BITWSHIFTL, 
Binary.OperationTypes.BW_SHIFTL);
                HopsOpOp2LopsB.put(OpOp2.BITWSHIFTR, 
Binary.OperationTypes.BW_SHIFTR);
+               HopsOpOp2LopsB.put(OpOp2.DROP_INVALID, 
Binary.OperationTypes.DROP_INVALID);
        }
 
        protected static final HashMap<Hop.OpOp2, BinaryScalar.OperationTypes> 
HopsOpOp2LopsBS;
@@ -1320,7 +1322,8 @@ public abstract class Hop implements ParseInfo
                HopsOpOp2String.put(OpOp2.BITWXOR, "bitwXor");
                HopsOpOp2String.put(OpOp2.BITWSHIFTL, "bitwShiftL");
                HopsOpOp2String.put(OpOp2.BITWSHIFTR, "bitwShiftR");
-               
+               HopsOpOp2String.put(OpOp2.DROP_INVALID, "dropInvalid");
+
                HopsStringOpOp2 = new HashMap<>();
                for( Entry<OpOp2,String> e : HopsOpOp2String.entrySet() )
                        HopsStringOpOp2.put(e.getValue(), e.getKey());
diff --git a/src/main/java/org/apache/sysds/lops/Binary.java 
b/src/main/java/org/apache/sysds/lops/Binary.java
index 0de7607..9bda9fa 100644
--- a/src/main/java/org/apache/sysds/lops/Binary.java
+++ b/src/main/java/org/apache/sysds/lops/Binary.java
@@ -37,6 +37,7 @@ public class Binary extends Lop
                AND, OR, XOR,
                MAX, MIN, POW, SOLVE, NOTSUPPORTED,
                BW_AND, BW_OR, BW_XOR, BW_SHIFTL, BW_SHIFTR, //Bitwise 
operations
+               DROP_INVALID,
        }
 
        private OperationTypes operation;
@@ -115,9 +116,9 @@ public class Binary extends Lop
                case DIVIDE:
                        return "/";
                case MODULUS:
-                       return "%%";    
+                       return "%%";
                case INTDIV:
-                       return "%/%";           
+                       return "%/%";
                case MATMULT:
                        return "ba+*";
                case MINUS1_MULTIPLY:
@@ -167,6 +168,9 @@ public class Binary extends Lop
                        
                case SOLVE:
                        return "solve";
+
+               case DROP_INVALID:
+                       return "dropInvalid";
                        
                default:
                        throw new UnsupportedOperationException("Instruction is 
not defined for Binary operation: " + op);
@@ -200,7 +204,6 @@ public class Binary extends Lop
                        sb.append( OPERAND_DELIMITOR );
                        sb.append( isRightTransposed );
                }
-               
                return sb.toString();
        }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sysds/lops/BinaryM.java 
b/src/main/java/org/apache/sysds/lops/BinaryM.java
index 1cd8af8..50207c2 100644
--- a/src/main/java/org/apache/sysds/lops/BinaryM.java
+++ b/src/main/java/org/apache/sysds/lops/BinaryM.java
@@ -109,7 +109,7 @@ public class BinaryM extends Lop
                case INTDIV:
                        return "map%/%";
                case MINUS1_MULTIPLY:
-                       return "map1-*";        
+                       return "map1-*";
                
                /* Relational */
                case LESS_THAN:
@@ -139,7 +139,7 @@ public class BinaryM extends Lop
                        return "mapmax";
                case POW:
                        return "map^";
-                       
+               
                default:
                        throw new UnsupportedOperationException("Instruction is 
not defined for Binary operation: " + op);
                }
diff --git 
a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java 
b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index 459d70b..cfda652 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -1519,6 +1519,16 @@ public class BuiltinFunctionExpression extends 
DataIdentifier
                        output.setBlocksize(0);
                        break;
 
+               case DROP_INVALID:
+                       checkNumParameters(2);
+                       checkMatrixFrameParam(getFirstExpr());
+                       checkMatrixFrameParam(getSecondExpr());
+                       output.setDataType(DataType.FRAME);
+                       output.setDimensions(id.getDim1(), id.getDim2());
+                       output.setBlocksize (id.getBlocksize());
+                       output.setValueType(ValueType.STRING);
+                       break;
+
                default:
                        if( isMathFunction() ) {
                                checkMathFunctionParam();
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index 15db26e..0b92e18 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -2434,7 +2434,6 @@ public class DMLTranslator
                        currBuiltinOp = new BinaryOp(target.getName(), 
target.getDataType(),
                                target.getValueType(), 
OpOp2.valueOf(source.getOpCode().name()), expr, expr2);
                        break;
-
                case ABS:
                case SIN:
                case COS:
@@ -2462,6 +2461,10 @@ public class DMLTranslator
                        currBuiltinOp = new UnaryOp(target.getName(), 
target.getDataType(), target.getValueType(),
                                OpOp1.valueOf(source.getOpCode().name()), expr);
                        break;
+               case DROP_INVALID:
+                       currBuiltinOp = new BinaryOp(target.getName(), 
target.getDataType(),
+                               target.getValueType(), 
OpOp2.valueOf(source.getOpCode().name()), expr, expr2);
+                       break;
                
                case LOG:
                                if (expr2 == null) {
@@ -2472,11 +2475,11 @@ public class DMLTranslator
                                                break;
                                        default:
                                                throw new 
ParseException(source.printErrorLocation() +
-                                                               
"processBuiltinFunctionExpression():: Could not find Operation type for builtin 
function: "
-                                                                               
+ source.getOpCode());
+                                                       
"processBuiltinFunctionExpression():: Could not find Operation type for builtin 
function: "
+                                                       + source.getOpCode());
                                        }
-                                       currBuiltinOp = new 
UnaryOp(target.getName(), target.getDataType(), target.getValueType(), mathOp2,
-                                                       expr);
+                                       currBuiltinOp = new 
UnaryOp(target.getName(),
+                                               target.getDataType(), 
target.getValueType(), mathOp2, expr);
                                } else {
                                        Hop.OpOp2 mathOp3;
                                        switch (source.getOpCode()) {
@@ -2485,8 +2488,8 @@ public class DMLTranslator
                                                break;
                                        default:
                                                throw new 
ParseException(source.printErrorLocation() +
-                                                               
"processBuiltinFunctionExpression():: Could not find Operation type for builtin 
function: "
-                                                                               
+ source.getOpCode());
+                                                       
"processBuiltinFunctionExpression():: Could not find Operation type for builtin 
function: "
+                                                               + 
source.getOpCode());
                                        }
                                        currBuiltinOp = new 
BinaryOp(target.getName(), target.getDataType(), target.getValueType(), mathOp3,
                                                        expr, expr2);
@@ -2527,12 +2530,12 @@ public class DMLTranslator
                        currBuiltinOp = new DataGenOp(OpOpDG.TIME, target);
                        break;
                        
-               case SAMPLE: 
+               case SAMPLE:
                {
                        Expression[] in = source.getAllExpr();
                        
                        // arguments: range/size/replace/seed; defaults: 
replace=FALSE
-                               
+                       
                        HashMap<String,Hop> tmpparams = new HashMap<>();
                        tmpparams.put(DataExpression.RAND_MAX, expr); //range
                        tmpparams.put(DataExpression.RAND_ROWS, expr2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java 
b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
index 729eac5..8cac05c 100644
--- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
+++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
@@ -50,7 +50,7 @@ public class Builtin extends ValueFunction
        public enum BuiltinCode { SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS, 
ATAN, LOG, LOG_NZ, MIN,
                MAX, ABS, SIGN, SQRT, EXP, PLOGP, PRINT, PRINTF, NROW, NCOL, 
LENGTH, LINEAGE, ROUND, MAXINDEX, MININDEX,
                STOP, CEIL, FLOOR, CUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD, 
INVERSE, SPROP, SIGMOID, EVAL, LIST,
-               TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF }
+               TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID }
        public BuiltinCode bFunc;
        
        private static final boolean FASTMATH = true;
@@ -102,6 +102,7 @@ public class Builtin extends ValueFunction
                String2BuiltinCode.put( "isna", BuiltinCode.ISNA);
                String2BuiltinCode.put( "isnan", BuiltinCode.ISNAN);
                String2BuiltinCode.put( "isinf", BuiltinCode.ISINF);
+               String2BuiltinCode.put( "dropInvalid", 
BuiltinCode.DROP_INVALID);
        }
        
        private Builtin(BuiltinCode bf) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
index 70a2ea0..c613222 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
@@ -149,7 +149,8 @@ public class CPInstructionParser extends InstructionParser
                String2CPInstructionType.put( "solve"  , CPType.Binary);
                String2CPInstructionType.put( "max"  , CPType.Binary);
                String2CPInstructionType.put( "min"  , CPType.Binary);
-               
+               String2CPInstructionType.put( "dropInvalid"  , CPType.Binary);
+
                String2CPInstructionType.put( "nmax", CPType.BuiltinNary);
                String2CPInstructionType.put( "nmin", CPType.BuiltinNary);
                String2CPInstructionType.put( "n+"  , CPType.BuiltinNary);
@@ -335,7 +336,7 @@ public class CPInstructionParser extends InstructionParser
                        
                        case Unary:
                                return UnaryCPInstruction.parseInstruction(str);
-                       
+
                        case Binary:
                                return 
BinaryCPInstruction.parseInstruction(str);
                        
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java 
b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 8926c0e..c59c350 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -515,6 +515,7 @@ public class InstructionUtils
                                new BinaryOperator( 
Builtin.getBuiltinFnObject(opcode))) :
                        (matrixScalar ? parseScalarBinaryOperator(opcode, 
in1.getDataType().isScalar()) :
                                parseExtendedBinaryOperator(opcode));
+
        }
        
        public static BinaryOperator parseBinaryOperator(String opcode) 
@@ -571,6 +572,8 @@ public class InstructionUtils
                        return new 
BinaryOperator(Builtin.getBuiltinFnObject("max"));
                else if ( opcode.equalsIgnoreCase("min") ) 
                        return new 
BinaryOperator(Builtin.getBuiltinFnObject("min"));
+               else if( opcode.equalsIgnoreCase("dropInvalid"))
+                       return new 
BinaryOperator(Builtin.getBuiltinFnObject("dropInvalid"));
                
                throw new RuntimeException("Unknown binary opcode " + opcode);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
index e7ee43a..5891d30 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
@@ -86,8 +86,7 @@ import 
org.apache.sysds.runtime.instructions.spark.UnaryMatrixSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.WriteSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.ZipmmSPInstruction;
 
-
-public class SPInstructionParser extends InstructionParser 
+public class SPInstructionParser extends InstructionParser
 {      
        public static final HashMap<String, SPType> String2SPInstructionType;
        static {
@@ -133,7 +132,8 @@ public class SPInstructionParser extends InstructionParser
                String2SPInstructionType.put( "pmm"        , SPType.PMM);
                String2SPInstructionType.put( "zipmm"      , SPType.ZIPMM);
                String2SPInstructionType.put( "pmapmm"     , SPType.PMAPMM);
-               
+
+
                String2SPInstructionType.put( "uaggouterchain", 
SPType.UaggOuterChain);
                
                //ternary aggregate operators
@@ -177,7 +177,7 @@ public class SPInstructionParser extends InstructionParser
                String2SPInstructionType.put( "map^"    , SPType.Binary);
                String2SPInstructionType.put( "map+*"   , SPType.Binary);
                String2SPInstructionType.put( "map-*"   , SPType.Binary);
-               
+               String2SPInstructionType.put( "dropInvalid", SPType.Binary);
                // Relational Instruction Opcodes 
                String2SPInstructionType.put( "=="   , SPType.Binary);
                String2SPInstructionType.put( "!="   , SPType.Binary);
@@ -390,7 +390,7 @@ public class SPInstructionParser extends InstructionParser
                                
                        case Binary:
                                return 
BinarySPInstruction.parseInstruction(str);
-                       
+
                        case Ternary:
                                return 
TernarySPInstruction.parseInstruction(str);
                        
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
index d73eda1..1e4e7f6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
@@ -52,6 +52,8 @@ public abstract class BinaryCPInstruction extends 
ComputationCPInstruction {
                        return new BinaryMatrixMatrixCPInstruction(operator, 
in1, in2, out, opcode, str);
                else if (in1.getDataType() == DataType.TENSOR && 
in2.getDataType() == DataType.TENSOR)
                        return new BinaryTensorTensorCPInstruction(operator, 
in1, in2, out, opcode, str);
+               else if (in1.getDataType() == DataType.FRAME && 
in2.getDataType() == DataType.FRAME)
+                       return new BinaryFrameFrameCPInstruction(operator, in1, 
in2, out, opcode, str);
                else
                        return new BinaryMatrixScalarCPInstruction(operator, 
in1, in2, out, opcode, str);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
new file mode 100644
index 0000000..ba0bfe1
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.cp;
+
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameFrameCPInstruction extends BinaryCPInstruction
+{
+       protected BinaryFrameFrameCPInstruction(Operator op, CPOperand in1,
+               CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(CPType.Binary, op, in1, in2, out, opcode, istr);
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               // Read input matrices
+               FrameBlock inBlock1 = ec.getFrameInput(input1.getName());
+               FrameBlock inBlock2 = ec.getFrameInput(input2.getName());
+
+               // Perform computation using input frames, and produce the 
result frame
+               FrameBlock retBlock = inBlock1.dropInvalid(inBlock2);
+               // Release the memory occupied by input frames
+               ec.releaseFrameInput(input1.getName());
+               ec.releaseFrameInput(input2.getName());
+               // Attach result frame with FrameBlock associated with 
output_name
+               ec.setFrameOutput(output.getName(), retBlock);
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
index d420e10..1e60eea 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
@@ -35,7 +35,7 @@ public abstract class CPInstruction extends Instruction
 {
        public enum CPType {
                AggregateUnary, AggregateBinary, AggregateTernary,
-               Unary, Binary, Ternary, Quaternary, BuiltinNary, Ctable, 
+               Unary, Binary, Ternary, Quaternary, BuiltinNary, Ctable,
                MultiReturnParameterizedBuiltin, ParameterizedBuiltin, 
MultiReturnBuiltin,
                Builtin, Reorg, Variable, External, Append, Rand, QSort, QPick,
                MatrixIndexing, MMTSJ, PMMJ, MMChain, Reshape, Partition, 
Compression, SpoofFused,
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
new file mode 100644
index 0000000..263abf3
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameFrameSPInstruction extends BinarySPInstruction {
+       protected BinaryFrameFrameSPInstruction(Operator op, CPOperand in1, 
CPOperand in2, CPOperand out, String opcode, String istr) {
+               super(SPType.Binary, op, in1, in2, out, opcode, istr);
+       }
+
+       public static BinarySPInstruction parseInstruction ( String str) {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 3);
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand out = new CPOperand(parts[3]);
+               Types.DataType dt1 = in1.getDataType();
+               Types.DataType dt2 = in2.getDataType();
+               Operator operator = 
InstructionUtils.parseBinaryOrBuiltinOperator(opcode, in1, in2);
+               if(dt1 == Types.DataType.FRAME && dt2 == Types.DataType.FRAME)
+                       return new BinaryFrameFrameSPInstruction(operator, in1, 
in2, out, opcode, str);
+               else
+                       throw new DMLRuntimeException("Frame binary operation 
not yet implemented for frame-scalar, or frame-matrix");
+       }
+
+       @Override
+       public void processInstruction(ExecutionContext ec) {
+               SparkExecutionContext sec = (SparkExecutionContext)ec;
+               // Get input RDDs
+               JavaPairRDD<Long, FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+               // get schema frame-block
+               Broadcast<FrameBlock> fb = 
sec.getSparkContext().broadcast(sec.getFrameInput(input2.getName()));
+               JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new 
isCorrectbySchema(fb.getValue()));
+               //release input frame
+               sec.releaseFrameInput(input2.getName());
+               //set output RDD
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+       }
+
+       private static class isCorrectbySchema implements 
Function<FrameBlock,FrameBlock> {
+               private static final long serialVersionUID = 
5850400295183766400L;
+
+               private FrameBlock schema_frame = null;
+
+               public isCorrectbySchema(FrameBlock fb_name ) {
+                       schema_frame = fb_name;
+               }
+
+               @Override
+               public FrameBlock call(FrameBlock arg0) throws Exception {
+                       return arg0.dropInvalid(schema_frame);
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
index 4a81bb3..dc4e09b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
@@ -75,6 +75,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                        vtype = VectorType.valueOf(parts[5]);
                        isBroadcast = true;
                }
+
                else {
                        opcode = parseBinaryInstruction(str, in1, in2, out);
                }
@@ -104,6 +105,10 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                        else
                                throw new DMLRuntimeException("Tensor binary 
operation not yet implemented for tensor-scalar, or tensor-matrix");
                }
+               else if( dt1 == DataType.FRAME || dt2 == DataType.FRAME ) {
+                       return 
BinaryFrameFrameSPInstruction.parseInstruction(str);
+               }
+
                return null;
        }
 
@@ -204,14 +209,14 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                                in2 = in2.flatMapToPair(new 
ReplicateTensorFunction(i, numReps));
                }
                int numPrefPart = SparkUtils.isHashPartitioned(in1) ? 
in1.getNumPartitions() :
-                               SparkUtils.isHashPartitioned(in2) ? 
in2.getNumPartitions() :
-                                               Math.min(in1.getNumPartitions() 
+ in2.getNumPartitions(),
-                                                               2 * 
SparkUtils.getNumPreferredPartitions(dcOut));
+                       SparkUtils.isHashPartitioned(in2) ? 
in2.getNumPartitions() :
+                       Math.min(in1.getNumPartitions() + 
in2.getNumPartitions(),
+                               2 * 
SparkUtils.getNumPreferredPartitions(dcOut));
 
                //execute binary operation
                JavaPairRDD<TensorIndexes, TensorBlock> out = in1
-                               .join(in2, numPrefPart)
-                               .mapValues(new 
TensorTensorBinaryOpFunction(bop));
+                       .join(in2, numPrefPart)
+                       .mapValues(new TensorTensorBinaryOpFunction(bop));
 
                //set output RDD
                sec.setRDDHandleForVariable(output.getName(), out);
@@ -227,7 +232,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                checkMatrixMatrixBinaryCharacteristics(sec);
 
                //get input RDDs
-               String rddVar = input1.getName(); 
+               String rddVar = input1.getName();
                String bcastVar = input2.getName();
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryMatrixBlockRDDHandleForVariable( rddVar );
                PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( bcastVar );
@@ -248,7 +253,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                        //why we cannot use mapValues is the need for broadcast 
key lookups.
                        //alternative: out = in1.mapToPair(new 
MatrixVectorBinaryOpFunction(bop, in2, vtype));
                        out = in1.mapPartitionsToPair(
-                                       new 
MatrixVectorBinaryOpPartitionFunction(bop, in2, vtype), true);
+                               new MatrixVectorBinaryOpPartitionFunction(bop, 
in2, vtype), true);
                }
                
                //set output RDD
@@ -282,7 +287,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                JavaPairRDD<TensorIndexes, TensorBlock> out;
                // TODO less dims broadcast variable
                out = in1.mapPartitionsToPair(
-                               new TensorTensorBinaryOpPartitionFunction(bop, 
in2, replicateDim), true);
+                       new TensorTensorBinaryOpPartitionFunction(bop, in2, 
replicateDim), true);
 
                //set output RDD
                updateBinaryTensorOutputDataCharacteristics(sec);
@@ -307,7 +312,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                
                //execute scalar matrix arithmetic instruction
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1.mapValues( new 
MatrixScalarUnaryFunction(sc_op) );
-                       
+               
                //put output RDD handle into symbol table
                updateUnaryOutputDataCharacteristics(sec, rddVar, 
output.getName());
                sec.setRDDHandleForVariable(output.getName(), out);
@@ -356,15 +361,12 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                }
        }
 
-       protected long getNumReplicas(DataCharacteristics mc1, 
DataCharacteristics mc2, boolean left)
-       {
-               if( left ) 
-               {
+       protected long getNumReplicas(DataCharacteristics mc1, 
DataCharacteristics mc2, boolean left) {
+               if( left ) {
                        if(mc1.getCols()==1 ) //outer
                                return mc2.getNumColBlocks();
                }
-               else
-               {
+               else {
                        if(mc2.getRows()==1 && mc1.getRows()>1) //outer, row 
vector
                                return mc1.getNumRowBlocks();
                        else if( mc2.getCols()==1 && mc1.getCols()>1 ) //col 
vector
@@ -388,7 +390,7 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                //check for unknown input dimensions
                if( !(mc1.dimsKnown() && mc2.dimsKnown()) ){
                        throw new DMLRuntimeException("Unknown dimensions 
matrix-matrix binary operations: "
-                                       + "[" + mc1.getRows() + "x" + 
mc1.getCols()  + " vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
+                               + "[" + mc1.getRows() + "x" + mc1.getCols()  + 
" vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
                }
                
                //check for dimension mismatch
@@ -398,13 +400,13 @@ public abstract class BinarySPInstruction extends 
ComputationSPInstruction {
                        && !(mc1.getCols()==1 && mc2.getRows()==1) )     
//outer colvector-rowvector 
                {
                        throw new DMLRuntimeException("Dimensions mismatch 
matrix-matrix binary operations: "
-                                       + "[" + mc1.getRows() + "x" + 
mc1.getCols()  + " vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
-               }       
+                               + "[" + mc1.getRows() + "x" + mc1.getCols()  + 
" vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
+               }
                
                if(mc1.getBlocksize() != mc2.getBlocksize()) {
                        throw new DMLRuntimeException("Blocksize mismatch 
matrix-matrix binary operations: "
-                                       + "[" + mc1.getBlocksize() + "x" + 
mc1.getBlocksize()  + " vs " + mc2.getBlocksize() + "x" + mc2.getBlocksize() + 
"]");
-               }       
+                               + "[" + mc1.getBlocksize() + "x" + 
mc1.getBlocksize()  + " vs " + mc2.getBlocksize() + "x" + mc2.getBlocksize() + 
"]");
+               }
        }
 
        protected void 
checkTensorTensorBinaryCharacteristics(SparkExecutionContext sec)
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 70a0a38..7456cc4 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.matrix.data;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.io.Writable;
+import org.apache.sysds.api.DMLException;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
@@ -1829,6 +1830,41 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                return fb;
        }
 
+       public FrameBlock dropInvalid(FrameBlock schema) {
+               //sanity checks
+               if(this.getNumColumns() != schema.getNumColumns())
+                       throw new DMLException("mismatch in number of columns 
in frame and its schema ");
+
+               String[] schemaString = schema.getStringRowIterator().next(); 
// extract the schema in String array
+               for (int i = 0; i < this.getNumColumns(); i++) {
+                       Array obj = this.getColumn(i);
+                       for (int j = 0; j < this.getNumRows(); j++)
+                       {
+                               if(obj.get(j) == null)
+                                       continue;
+                               String dataValue = 
obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;
+
+                               ValueType dataType = isType(dataValue);
+                                if (dataType== ValueType.FP64 && 
schemaString[i].trim().equals("FP32")) {
+                                        double maxValue = 
Double.parseDouble(dataValue);
+                                        if ((maxValue < (-Float.MAX_VALUE)) || 
(maxValue > Float.MAX_VALUE))
+                                                this.set(j,i,null);
+                               }
+                               else if (dataType== ValueType.INT64 && 
schemaString[i].trim().equals("INT32")) {
+                                        long maxValue = 
Long.parseLong(dataValue);
+                                        if ((maxValue < Integer.MIN_VALUE) || 
(maxValue > Integer.MAX_VALUE))
+                                                this.set(j,i,null);
+                               }
+                               else if(dataType == ValueType.BOOLEAN && 
schemaString[i].trim().equals("INT32")
+                                                && 
((Integer.parseInt(dataValue) == 1 || Integer.parseInt(dataValue) == 0)))
+                                       continue;
+                               else if 
(!dataType.toString().equals(schemaString[i].trim()))
+                                       this.set(j,i,null);
+                       }
+               }
+               return this;
+       }
+
        public static FrameBlock mergeSchema(FrameBlock temp1, FrameBlock 
temp2) {
                String[] rowTemp1 = temp1.getStringRowIterator().next();
                String[] rowTemp2 = temp2.getStringRowIterator().next();
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index 4c3e1c5..1185fd2 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -464,6 +464,7 @@ public class UtilFunctions
                        case INT32:   return Integer.parseInt(in);
                        case INT64:   return Long.parseLong(in);
                        case FP64:    return Double.parseDouble(in);
+                       case FP32:    return Float.parseFloat(in);
                        default: throw new RuntimeException("Unsupported value 
type: "+vt);
                }
        }
@@ -516,7 +517,8 @@ public class UtilFunctions
        }
 
        public static Object objectToObject(ValueType vt, Object in) {
-               if( in instanceof Double && vt == ValueType.FP64 
+               if( in instanceof Double && vt == ValueType.FP64
+                       || in instanceof Float && vt == ValueType.FP32
                        || in instanceof Long && vt == ValueType.INT64
                        || in instanceof Integer && vt == ValueType.INT32
                        || in instanceof Boolean && vt == ValueType.BOOLEAN
diff --git 
a/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
new file mode 100644
index 0000000..fe84c0b
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.frame;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.InputInfo;
+import org.apache.sysds.runtime.matrix.data.OutputInfo;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class FrameIsCorrectTypeTest extends AutomatedTestBase
+{
+       private final static String TEST_NAME = "DropInvalid";
+       private final static String TEST_DIR = "functions/frame/";
+       private static final String TEST_CLASS_DIR = TEST_DIR + 
FrameIsCorrectTypeTest.class.getSimpleName() + "/";
+
+       private final static int rows = 20;
+       private final static ValueType[] schemaStrings = {ValueType.FP64, 
ValueType.STRING};
+
+       public static void init() {
+               TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+       }
+
+       public static void cleanUp() {
+               if (TEST_CACHE_ENABLED) {
+                       TestUtils.clearDirectory(TEST_DATA_DIR + 
TEST_CLASS_DIR);
+               }
+       }
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+               if (TEST_CACHE_ENABLED) {
+                       setOutAndExpectedDeletionDisabled(true);
+               }
+       }
+
+       @Test
+       public void testDoubleinStringCP() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
1, LopProperties.ExecType.CP);
+       }
+
+       @Test
+       public void testDoubleinStringSpark() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
1, LopProperties.ExecType.SPARK);
+       }
+
+       @Test
+       public void testStringInDouble() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
2, LopProperties.ExecType.CP);
+       }
+
+       @Test
+       public void testStringInDoubleSpark() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 
2, LopProperties.ExecType.SPARK);
+       }
+
+       @Test
+       public void testDoubleInFloat() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
3, LopProperties.ExecType.CP);
+       }
+
+       @Test
+       public void testDoubleInFloatSpark() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
3, LopProperties.ExecType.SPARK);
+       }
+
+       @Test
+       public void testLongInInt() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
4, LopProperties.ExecType.CP);
+       }
+
+       @Test
+       public void testLongInIntSpark() {
+               runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 
4, LopProperties.ExecType.SPARK);
+       }
+       private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
+               int badValues, int test, LopProperties.ExecType et)
+       {
+               Types.ExecMode platformOld = setExecMode(et);
+               boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               try {
+                       getAndLoadTestConfiguration(TEST_NAME);
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + TEST_NAME + ".dml";
+                       programArgs = new String[] {"-args", input("A"), 
input("M"),
+                               String.valueOf(rows), Integer.toString(cols), 
output("B")};
+                       FrameBlock frame1 = new FrameBlock(schema);
+                       FrameWriter writer = 
FrameWriterFactory.createFrameWriter(OutputInfo.CSVOutputInfo);
+                       FrameBlock frame2 = new 
FrameBlock(UtilFunctions.nCopies(cols, Types.ValueType.STRING));
+                       String[] meta = new String[]{"FP64", "STRING"};
+
+                       initFrameDataString(frame1); // initialize a frame with 
one column
+
+                       switch (test) { //Double in String
+                               case 1:
+                                       String[] S = new String[rows];
+                                       Arrays.fill(S, "string_value");
+                                       for (int i = 0; i < badValues; i++)
+                                               S[i] = "0.1345672225";
+                                       frame1.appendColumn(S);
+                                       break;
+
+                               case 2: { // String in double
+                                       double[][] D = getRandomMatrix(rows, 1, 
1, 10, 0.7, 2373);
+                                       String[] tmp1 = new String[rows];
+                                       for (int i = 0; i < rows; i++)
+                                               tmp1[i] = (String) 
UtilFunctions.doubleToObject(ValueType.STRING, D[i][0], false);
+                                       frame1.appendColumn(tmp1);
+                                       for (int i = 0; i < badValues; i++)
+                                               frame1.set(i, 1, 
"string_value");
+                                       meta[meta.length - 1] = "FP64";
+                                       break;
+                               }
+                               case 3: {//Double in float
+                                       double[][] D = getRandomMatrix(rows, 1, 
1, 10, 0.7, 2373);
+                                       String[] tmp1 = new String[rows];
+                                       for (int i = 0; i < rows; i++)
+                                               tmp1[i] = (String) 
UtilFunctions.doubleToObject(ValueType.STRING, D[i][0], false);
+                                       frame1.appendColumn(tmp1);
+                                       for (int i = 0; i < badValues; i++)
+                                               frame1.set(i, 1,  
"1234567890123456768E40");
+                                       meta[meta.length - 1] = "FP32";
+                                       break;
+                               }
+                               case 4: { // long in int
+                                       String[] tmp1 = new String[rows];
+                                       for (int i = 0; i < rows; i++)
+                                               tmp1[i] = String.valueOf(i);
+                                       for (int i = 0; i < badValues; i++)
+                                               tmp1[i] = "12345678910111212";
+                                       frame1.appendColumn(tmp1);
+                                       meta[meta.length - 1] = "INT32";
+                                       break;
+                               }
+                       }
+                       writer.writeFrameToHDFS(
+                               frame1.slice(0, rows - 1, 0, 1, new 
FrameBlock()),
+                               input("A"), rows, schema.length);
+
+                       frame2.appendRow(meta);
+                       writer.writeFrameToHDFS(frame2, input("M"), 1, 
schema.length);
+                       runTest(true, false, null, -1);
+                       FrameBlock frameout = readDMLFrameFromHDFS("B", 
InputInfo.BinaryBlockInputInfo);
+
+                       //read output data and compare results
+                       ArrayList<Object> data = new ArrayList<>();
+                       for (int i = 0; i < frameout.getNumRows(); i++)
+                               data.add(frameout.get(i, 1));
+
+                       int nullNum = Math.toIntExact(data.stream().filter(s -> 
s == null).count());
+                       //verify output schema
+                       Assert.assertEquals("Wrong result: " + nullNum + ".", 
badValues, nullNum);
+               }
+               catch (Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag;
+                       OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+                       OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+               }
+       }
+       private  void initFrameDataString(FrameBlock frame1) {
+               double[][] A = getRandomMatrix(rows, 1, Float.MAX_VALUE, 
Double.MAX_VALUE, 0.7, 2373);
+               double[] tmp6 = new double[rows];
+               for (int i = 0; i < rows; i++)
+                       tmp6[i] = (Double) 
UtilFunctions.doubleToObject(ValueType.FP64, A[i][0], false);
+               frame1.appendColumn(tmp6);
+       }
+}
diff --git a/src/test/scripts/functions/frame/DropInvalid.dml 
b/src/test/scripts/functions/frame/DropInvalid.dml
new file mode 100644
index 0000000..38d2436
--- /dev/null
+++ b/src/test/scripts/functions/frame/DropInvalid.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1, rows=$3, cols=$4, data_type="frame", format="csv");
+M = read($2, rows=1, cols=$4, data_type="frame", format="csv");
+R = dropInvalid(X,M);
+write(R, $5, format="binary");

Reply via email to