This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new 158ccff [SYSTEMDS-351] New builtin dropInvalid for cleaning by
expected schema
158ccff is described below
commit 158ccffbadef845058d9f3a2c5084fbb8fa00429
Author: Shafaq Siddiqi <[email protected]>
AuthorDate: Fri Apr 10 20:44:33 2020 +0200
[SYSTEMDS-351] New builtin dropInvalid for cleaning by expected schema
Closes #883.
---
docs/Tasks.txt | 3 +
.../java/org/apache/sysds/common/Builtins.java | 1 +
src/main/java/org/apache/sysds/hops/BinaryOp.java | 3 +-
src/main/java/org/apache/sysds/hops/Hop.java | 5 +-
src/main/java/org/apache/sysds/lops/Binary.java | 11 +-
src/main/java/org/apache/sysds/lops/BinaryM.java | 4 +-
.../sysds/parser/BuiltinFunctionExpression.java | 10 +
.../org/apache/sysds/parser/DMLTranslator.java | 21 ++-
.../sysds/runtime/functionobjects/Builtin.java | 3 +-
.../runtime/instructions/CPInstructionParser.java | 5 +-
.../runtime/instructions/InstructionUtils.java | 3 +
.../runtime/instructions/SPInstructionParser.java | 10 +-
.../instructions/cp/BinaryCPInstruction.java | 2 +
.../cp/BinaryFrameFrameCPInstruction.java | 47 +++++
.../runtime/instructions/cp/CPInstruction.java | 2 +-
.../spark/BinaryFrameFrameSPInstruction.java | 84 +++++++++
.../instructions/spark/BinarySPInstruction.java | 42 +++--
.../sysds/runtime/matrix/data/FrameBlock.java | 36 ++++
.../apache/sysds/runtime/util/UtilFunctions.java | 4 +-
.../functions/frame/FrameIsCorrectTypeTest.java | 206 +++++++++++++++++++++
src/test/scripts/functions/frame/DropInvalid.dml | 25 +++
21 files changed, 480 insertions(+), 47 deletions(-)
diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index d1168e6..5fdd96d 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -250,5 +250,8 @@ SYSTEMDS-340 Compiler Assisted Lineage Caching and Reuse
* 344 Unmark functions/SBs containing non-determinism for caching
* 345 Compiler assisted cache configuration
+SYSTEMDS-350 Data Cleaning Framework
+ * 351 New builtin function for error correction by schema OK
+
Others:
* Break append instruction to cbind and rbind
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java
b/src/main/java/org/apache/sysds/common/Builtins.java
index 7220198..4f20d87 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -83,6 +83,7 @@ public enum Builtins {
DETECTSCHEMA("detectSchema", false),
DIAG("diag", false),
DISCOVER_FD("discoverFD", true),
+ DROP_INVALID("dropInvalid", false),
EIGEN("eigen", false, ReturnType.MULTI_RETURN),
EXISTS("exists", false),
EXP("exp", false),
diff --git a/src/main/java/org/apache/sysds/hops/BinaryOp.java
b/src/main/java/org/apache/sysds/hops/BinaryOp.java
index 769329b..8212b53 100644
--- a/src/main/java/org/apache/sysds/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/BinaryOp.java
@@ -894,7 +894,8 @@ public class BinaryOp extends MultiThreadedHop
private static MMBinaryMethod optFindMMBinaryMethodSpark(Hop left, Hop
right) {
// TODO size information for tensor
- if (left._dataType == DataType.TENSOR && right._dataType ==
DataType.TENSOR)
+ if ((left._dataType == DataType.TENSOR && right._dataType ==
DataType.TENSOR)
+ || (left._dataType == DataType.FRAME && right._dataType
== DataType.FRAME))
return MMBinaryMethod.MR_BINARY_R;
long m1_dim1 = left.getDim1();
long m1_dim2 = left.getDim2();
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java
b/src/main/java/org/apache/sysds/hops/Hop.java
index 01c61c9..f64b5f0 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -1057,6 +1057,7 @@ public abstract class Hop implements ParseInfo
LOG_NZ, //sparse-safe log; ppred(X,0,"!=")*log(X,0.5)
MINUS1_MULT, //1-X*Y
BITWAND, BITWOR, BITWXOR, BITWSHIFTL, BITWSHIFTR, //bitwise
operations
+ DROP_INVALID, // frame operation for removing cells invalid wrt
given data type
}
public static final HashMap<Hop.OpOp2, Binary.OperationTypes>
HopsOpOp2LopsB;
@@ -1088,6 +1089,7 @@ public abstract class Hop implements ParseInfo
HopsOpOp2LopsB.put(OpOp2.BITWXOR, Binary.OperationTypes.BW_XOR);
HopsOpOp2LopsB.put(OpOp2.BITWSHIFTL,
Binary.OperationTypes.BW_SHIFTL);
HopsOpOp2LopsB.put(OpOp2.BITWSHIFTR,
Binary.OperationTypes.BW_SHIFTR);
+ HopsOpOp2LopsB.put(OpOp2.DROP_INVALID,
Binary.OperationTypes.DROP_INVALID);
}
protected static final HashMap<Hop.OpOp2, BinaryScalar.OperationTypes>
HopsOpOp2LopsBS;
@@ -1320,7 +1322,8 @@ public abstract class Hop implements ParseInfo
HopsOpOp2String.put(OpOp2.BITWXOR, "bitwXor");
HopsOpOp2String.put(OpOp2.BITWSHIFTL, "bitwShiftL");
HopsOpOp2String.put(OpOp2.BITWSHIFTR, "bitwShiftR");
-
+ HopsOpOp2String.put(OpOp2.DROP_INVALID, "dropInvalid");
+
HopsStringOpOp2 = new HashMap<>();
for( Entry<OpOp2,String> e : HopsOpOp2String.entrySet() )
HopsStringOpOp2.put(e.getValue(), e.getKey());
diff --git a/src/main/java/org/apache/sysds/lops/Binary.java
b/src/main/java/org/apache/sysds/lops/Binary.java
index 0de7607..9bda9fa 100644
--- a/src/main/java/org/apache/sysds/lops/Binary.java
+++ b/src/main/java/org/apache/sysds/lops/Binary.java
@@ -37,6 +37,7 @@ public class Binary extends Lop
AND, OR, XOR,
MAX, MIN, POW, SOLVE, NOTSUPPORTED,
BW_AND, BW_OR, BW_XOR, BW_SHIFTL, BW_SHIFTR, //Bitwise
operations
+ DROP_INVALID,
}
private OperationTypes operation;
@@ -115,9 +116,9 @@ public class Binary extends Lop
case DIVIDE:
return "/";
case MODULUS:
- return "%%";
+ return "%%";
case INTDIV:
- return "%/%";
+ return "%/%";
case MATMULT:
return "ba+*";
case MINUS1_MULTIPLY:
@@ -167,6 +168,9 @@ public class Binary extends Lop
case SOLVE:
return "solve";
+
+ case DROP_INVALID:
+ return "dropInvalid";
default:
throw new UnsupportedOperationException("Instruction is
not defined for Binary operation: " + op);
@@ -200,7 +204,6 @@ public class Binary extends Lop
sb.append( OPERAND_DELIMITOR );
sb.append( isRightTransposed );
}
-
return sb.toString();
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sysds/lops/BinaryM.java
b/src/main/java/org/apache/sysds/lops/BinaryM.java
index 1cd8af8..50207c2 100644
--- a/src/main/java/org/apache/sysds/lops/BinaryM.java
+++ b/src/main/java/org/apache/sysds/lops/BinaryM.java
@@ -109,7 +109,7 @@ public class BinaryM extends Lop
case INTDIV:
return "map%/%";
case MINUS1_MULTIPLY:
- return "map1-*";
+ return "map1-*";
/* Relational */
case LESS_THAN:
@@ -139,7 +139,7 @@ public class BinaryM extends Lop
return "mapmax";
case POW:
return "map^";
-
+
default:
throw new UnsupportedOperationException("Instruction is
not defined for Binary operation: " + op);
}
diff --git
a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
index 459d70b..cfda652 100644
--- a/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysds/parser/BuiltinFunctionExpression.java
@@ -1519,6 +1519,16 @@ public class BuiltinFunctionExpression extends
DataIdentifier
output.setBlocksize(0);
break;
+ case DROP_INVALID:
+ checkNumParameters(2);
+ checkMatrixFrameParam(getFirstExpr());
+ checkMatrixFrameParam(getSecondExpr());
+ output.setDataType(DataType.FRAME);
+ output.setDimensions(id.getDim1(), id.getDim2());
+ output.setBlocksize (id.getBlocksize());
+ output.setValueType(ValueType.STRING);
+ break;
+
default:
if( isMathFunction() ) {
checkMathFunctionParam();
diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
index 15db26e..0b92e18 100644
--- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java
@@ -2434,7 +2434,6 @@ public class DMLTranslator
currBuiltinOp = new BinaryOp(target.getName(),
target.getDataType(),
target.getValueType(),
OpOp2.valueOf(source.getOpCode().name()), expr, expr2);
break;
-
case ABS:
case SIN:
case COS:
@@ -2462,6 +2461,10 @@ public class DMLTranslator
currBuiltinOp = new UnaryOp(target.getName(),
target.getDataType(), target.getValueType(),
OpOp1.valueOf(source.getOpCode().name()), expr);
break;
+ case DROP_INVALID:
+ currBuiltinOp = new BinaryOp(target.getName(),
target.getDataType(),
+ target.getValueType(),
OpOp2.valueOf(source.getOpCode().name()), expr, expr2);
+ break;
case LOG:
if (expr2 == null) {
@@ -2472,11 +2475,11 @@ public class DMLTranslator
break;
default:
throw new
ParseException(source.printErrorLocation() +
-
"processBuiltinFunctionExpression():: Could not find Operation type for builtin
function: "
-
+ source.getOpCode());
+
"processBuiltinFunctionExpression():: Could not find Operation type for builtin
function: "
+ + source.getOpCode());
}
- currBuiltinOp = new
UnaryOp(target.getName(), target.getDataType(), target.getValueType(), mathOp2,
- expr);
+ currBuiltinOp = new
UnaryOp(target.getName(),
+ target.getDataType(),
target.getValueType(), mathOp2, expr);
} else {
Hop.OpOp2 mathOp3;
switch (source.getOpCode()) {
@@ -2485,8 +2488,8 @@ public class DMLTranslator
break;
default:
throw new
ParseException(source.printErrorLocation() +
-
"processBuiltinFunctionExpression():: Could not find Operation type for builtin
function: "
-
+ source.getOpCode());
+
"processBuiltinFunctionExpression():: Could not find Operation type for builtin
function: "
+ +
source.getOpCode());
}
currBuiltinOp = new
BinaryOp(target.getName(), target.getDataType(), target.getValueType(), mathOp3,
expr, expr2);
@@ -2527,12 +2530,12 @@ public class DMLTranslator
currBuiltinOp = new DataGenOp(OpOpDG.TIME, target);
break;
- case SAMPLE:
+ case SAMPLE:
{
Expression[] in = source.getAllExpr();
// arguments: range/size/replace/seed; defaults:
replace=FALSE
-
+
HashMap<String,Hop> tmpparams = new HashMap<>();
tmpparams.put(DataExpression.RAND_MAX, expr); //range
tmpparams.put(DataExpression.RAND_ROWS, expr2);
diff --git
a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
index 729eac5..8cac05c 100644
--- a/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
+++ b/src/main/java/org/apache/sysds/runtime/functionobjects/Builtin.java
@@ -50,7 +50,7 @@ public class Builtin extends ValueFunction
public enum BuiltinCode { SIN, COS, TAN, SINH, COSH, TANH, ASIN, ACOS,
ATAN, LOG, LOG_NZ, MIN,
MAX, ABS, SIGN, SQRT, EXP, PLOGP, PRINT, PRINTF, NROW, NCOL,
LENGTH, LINEAGE, ROUND, MAXINDEX, MININDEX,
STOP, CEIL, FLOOR, CUMSUM, CUMPROD, CUMMIN, CUMMAX, CUMSUMPROD,
INVERSE, SPROP, SIGMOID, EVAL, LIST,
- TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF }
+ TYPEOF, DETECTSCHEMA, ISNA, ISNAN, ISINF, DROP_INVALID }
public BuiltinCode bFunc;
private static final boolean FASTMATH = true;
@@ -102,6 +102,7 @@ public class Builtin extends ValueFunction
String2BuiltinCode.put( "isna", BuiltinCode.ISNA);
String2BuiltinCode.put( "isnan", BuiltinCode.ISNAN);
String2BuiltinCode.put( "isinf", BuiltinCode.ISINF);
+ String2BuiltinCode.put( "dropInvalid",
BuiltinCode.DROP_INVALID);
}
private Builtin(BuiltinCode bf) {
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
index 70a2ea0..c613222 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
@@ -149,7 +149,8 @@ public class CPInstructionParser extends InstructionParser
String2CPInstructionType.put( "solve" , CPType.Binary);
String2CPInstructionType.put( "max" , CPType.Binary);
String2CPInstructionType.put( "min" , CPType.Binary);
-
+ String2CPInstructionType.put( "dropInvalid" , CPType.Binary);
+
String2CPInstructionType.put( "nmax", CPType.BuiltinNary);
String2CPInstructionType.put( "nmin", CPType.BuiltinNary);
String2CPInstructionType.put( "n+" , CPType.BuiltinNary);
@@ -335,7 +336,7 @@ public class CPInstructionParser extends InstructionParser
case Unary:
return UnaryCPInstruction.parseInstruction(str);
-
+
case Binary:
return
BinaryCPInstruction.parseInstruction(str);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
index 8926c0e..c59c350 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/InstructionUtils.java
@@ -515,6 +515,7 @@ public class InstructionUtils
new BinaryOperator(
Builtin.getBuiltinFnObject(opcode))) :
(matrixScalar ? parseScalarBinaryOperator(opcode,
in1.getDataType().isScalar()) :
parseExtendedBinaryOperator(opcode));
+
}
public static BinaryOperator parseBinaryOperator(String opcode)
@@ -571,6 +572,8 @@ public class InstructionUtils
return new
BinaryOperator(Builtin.getBuiltinFnObject("max"));
else if ( opcode.equalsIgnoreCase("min") )
return new
BinaryOperator(Builtin.getBuiltinFnObject("min"));
+ else if( opcode.equalsIgnoreCase("dropInvalid"))
+ return new
BinaryOperator(Builtin.getBuiltinFnObject("dropInvalid"));
throw new RuntimeException("Unknown binary opcode " + opcode);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
index e7ee43a..5891d30 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
@@ -86,8 +86,7 @@ import
org.apache.sysds.runtime.instructions.spark.UnaryMatrixSPInstruction;
import org.apache.sysds.runtime.instructions.spark.WriteSPInstruction;
import org.apache.sysds.runtime.instructions.spark.ZipmmSPInstruction;
-
-public class SPInstructionParser extends InstructionParser
+public class SPInstructionParser extends InstructionParser
{
public static final HashMap<String, SPType> String2SPInstructionType;
static {
@@ -133,7 +132,8 @@ public class SPInstructionParser extends InstructionParser
String2SPInstructionType.put( "pmm" , SPType.PMM);
String2SPInstructionType.put( "zipmm" , SPType.ZIPMM);
String2SPInstructionType.put( "pmapmm" , SPType.PMAPMM);
-
+
+
String2SPInstructionType.put( "uaggouterchain",
SPType.UaggOuterChain);
//ternary aggregate operators
@@ -177,7 +177,7 @@ public class SPInstructionParser extends InstructionParser
String2SPInstructionType.put( "map^" , SPType.Binary);
String2SPInstructionType.put( "map+*" , SPType.Binary);
String2SPInstructionType.put( "map-*" , SPType.Binary);
-
+ String2SPInstructionType.put( "dropInvalid", SPType.Binary);
// Relational Instruction Opcodes
String2SPInstructionType.put( "==" , SPType.Binary);
String2SPInstructionType.put( "!=" , SPType.Binary);
@@ -390,7 +390,7 @@ public class SPInstructionParser extends InstructionParser
case Binary:
return
BinarySPInstruction.parseInstruction(str);
-
+
case Ternary:
return
TernarySPInstruction.parseInstruction(str);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
index d73eda1..1e4e7f6 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryCPInstruction.java
@@ -52,6 +52,8 @@ public abstract class BinaryCPInstruction extends
ComputationCPInstruction {
return new BinaryMatrixMatrixCPInstruction(operator,
in1, in2, out, opcode, str);
else if (in1.getDataType() == DataType.TENSOR &&
in2.getDataType() == DataType.TENSOR)
return new BinaryTensorTensorCPInstruction(operator,
in1, in2, out, opcode, str);
+ else if (in1.getDataType() == DataType.FRAME &&
in2.getDataType() == DataType.FRAME)
+ return new BinaryFrameFrameCPInstruction(operator, in1,
in2, out, opcode, str);
else
return new BinaryMatrixScalarCPInstruction(operator,
in1, in2, out, opcode, str);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
new file mode 100644
index 0000000..ba0bfe1
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.cp;
+
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameFrameCPInstruction extends BinaryCPInstruction
+{
+ protected BinaryFrameFrameCPInstruction(Operator op, CPOperand in1,
+ CPOperand in2, CPOperand out, String opcode, String istr) {
+ super(CPType.Binary, op, in1, in2, out, opcode, istr);
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec) {
+ // Read input matrices
+ FrameBlock inBlock1 = ec.getFrameInput(input1.getName());
+ FrameBlock inBlock2 = ec.getFrameInput(input2.getName());
+
+ // Perform computation using input frames, and produce the
result frame
+ FrameBlock retBlock = inBlock1.dropInvalid(inBlock2);
+ // Release the memory occupied by input frames
+ ec.releaseFrameInput(input1.getName());
+ ec.releaseFrameInput(input2.getName());
+ // Attach result frame with FrameBlock associated with
output_name
+ ec.setFrameOutput(output.getName(), retBlock);
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
index d420e10..1e60eea 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
@@ -35,7 +35,7 @@ public abstract class CPInstruction extends Instruction
{
public enum CPType {
AggregateUnary, AggregateBinary, AggregateTernary,
- Unary, Binary, Ternary, Quaternary, BuiltinNary, Ctable,
+ Unary, Binary, Ternary, Quaternary, BuiltinNary, Ctable,
MultiReturnParameterizedBuiltin, ParameterizedBuiltin,
MultiReturnBuiltin,
Builtin, Reorg, Variable, External, Append, Rand, QSort, QPick,
MatrixIndexing, MMTSJ, PMMJ, MMChain, Reshape, Partition,
Compression, SpoofFused,
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
new file mode 100644
index 0000000..263abf3
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class BinaryFrameFrameSPInstruction extends BinarySPInstruction {
+ protected BinaryFrameFrameSPInstruction(Operator op, CPOperand in1,
CPOperand in2, CPOperand out, String opcode, String istr) {
+ super(SPType.Binary, op, in1, in2, out, opcode, istr);
+ }
+
+ public static BinarySPInstruction parseInstruction ( String str) {
+ String[] parts =
InstructionUtils.getInstructionPartsWithValueType(str);
+ InstructionUtils.checkNumFields (parts, 3);
+ String opcode = parts[0];
+ CPOperand in1 = new CPOperand(parts[1]);
+ CPOperand in2 = new CPOperand(parts[2]);
+ CPOperand out = new CPOperand(parts[3]);
+ Types.DataType dt1 = in1.getDataType();
+ Types.DataType dt2 = in2.getDataType();
+ Operator operator =
InstructionUtils.parseBinaryOrBuiltinOperator(opcode, in1, in2);
+ if(dt1 == Types.DataType.FRAME && dt2 == Types.DataType.FRAME)
+ return new BinaryFrameFrameSPInstruction(operator, in1,
in2, out, opcode, str);
+ else
+ throw new DMLRuntimeException("Frame binary operation
not yet implemented for frame-scalar, or frame-matrix");
+ }
+
+ @Override
+ public void processInstruction(ExecutionContext ec) {
+ SparkExecutionContext sec = (SparkExecutionContext)ec;
+ // Get input RDDs
+ JavaPairRDD<Long, FrameBlock> in1 =
sec.getFrameBinaryBlockRDDHandleForVariable(input1.getName());
+ // get schema frame-block
+ Broadcast<FrameBlock> fb =
sec.getSparkContext().broadcast(sec.getFrameInput(input2.getName()));
+ JavaPairRDD<Long, FrameBlock> out = in1.mapValues(new
isCorrectbySchema(fb.getValue()));
+ //release input frame
+ sec.releaseFrameInput(input2.getName());
+ //set output RDD
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD(output.getName(), input1.getName());
+ }
+
+ private static class isCorrectbySchema implements
Function<FrameBlock,FrameBlock> {
+ private static final long serialVersionUID =
5850400295183766400L;
+
+ private FrameBlock schema_frame = null;
+
+ public isCorrectbySchema(FrameBlock fb_name ) {
+ schema_frame = fb_name;
+ }
+
+ @Override
+ public FrameBlock call(FrameBlock arg0) throws Exception {
+ return arg0.dropInvalid(schema_frame);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
index 4a81bb3..dc4e09b 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinarySPInstruction.java
@@ -75,6 +75,7 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
vtype = VectorType.valueOf(parts[5]);
isBroadcast = true;
}
+
else {
opcode = parseBinaryInstruction(str, in1, in2, out);
}
@@ -104,6 +105,10 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
else
throw new DMLRuntimeException("Tensor binary
operation not yet implemented for tensor-scalar, or tensor-matrix");
}
+ else if( dt1 == DataType.FRAME || dt2 == DataType.FRAME ) {
+ return
BinaryFrameFrameSPInstruction.parseInstruction(str);
+ }
+
return null;
}
@@ -204,14 +209,14 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
in2 = in2.flatMapToPair(new
ReplicateTensorFunction(i, numReps));
}
int numPrefPart = SparkUtils.isHashPartitioned(in1) ?
in1.getNumPartitions() :
- SparkUtils.isHashPartitioned(in2) ?
in2.getNumPartitions() :
- Math.min(in1.getNumPartitions()
+ in2.getNumPartitions(),
- 2 *
SparkUtils.getNumPreferredPartitions(dcOut));
+ SparkUtils.isHashPartitioned(in2) ?
in2.getNumPartitions() :
+ Math.min(in1.getNumPartitions() +
in2.getNumPartitions(),
+ 2 *
SparkUtils.getNumPreferredPartitions(dcOut));
//execute binary operation
JavaPairRDD<TensorIndexes, TensorBlock> out = in1
- .join(in2, numPrefPart)
- .mapValues(new
TensorTensorBinaryOpFunction(bop));
+ .join(in2, numPrefPart)
+ .mapValues(new TensorTensorBinaryOpFunction(bop));
//set output RDD
sec.setRDDHandleForVariable(output.getName(), out);
@@ -227,7 +232,7 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
checkMatrixMatrixBinaryCharacteristics(sec);
//get input RDDs
- String rddVar = input1.getName();
+ String rddVar = input1.getName();
String bcastVar = input2.getName();
JavaPairRDD<MatrixIndexes,MatrixBlock> in1 =
sec.getBinaryMatrixBlockRDDHandleForVariable( rddVar );
PartitionedBroadcast<MatrixBlock> in2 =
sec.getBroadcastForVariable( bcastVar );
@@ -248,7 +253,7 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
//why we cannot use mapValues is the need for broadcast
key lookups.
//alternative: out = in1.mapToPair(new
MatrixVectorBinaryOpFunction(bop, in2, vtype));
out = in1.mapPartitionsToPair(
- new
MatrixVectorBinaryOpPartitionFunction(bop, in2, vtype), true);
+ new MatrixVectorBinaryOpPartitionFunction(bop,
in2, vtype), true);
}
//set output RDD
@@ -282,7 +287,7 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
JavaPairRDD<TensorIndexes, TensorBlock> out;
// TODO less dims broadcast variable
out = in1.mapPartitionsToPair(
- new TensorTensorBinaryOpPartitionFunction(bop,
in2, replicateDim), true);
+ new TensorTensorBinaryOpPartitionFunction(bop, in2,
replicateDim), true);
//set output RDD
updateBinaryTensorOutputDataCharacteristics(sec);
@@ -307,7 +312,7 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
//execute scalar matrix arithmetic instruction
JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1.mapValues( new
MatrixScalarUnaryFunction(sc_op) );
-
+
//put output RDD handle into symbol table
updateUnaryOutputDataCharacteristics(sec, rddVar,
output.getName());
sec.setRDDHandleForVariable(output.getName(), out);
@@ -356,15 +361,12 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
}
}
- protected long getNumReplicas(DataCharacteristics mc1,
DataCharacteristics mc2, boolean left)
- {
- if( left )
- {
+ protected long getNumReplicas(DataCharacteristics mc1,
DataCharacteristics mc2, boolean left) {
+ if( left ) {
if(mc1.getCols()==1 ) //outer
return mc2.getNumColBlocks();
}
- else
- {
+ else {
if(mc2.getRows()==1 && mc1.getRows()>1) //outer, row
vector
return mc1.getNumRowBlocks();
else if( mc2.getCols()==1 && mc1.getCols()>1 ) //col
vector
@@ -388,7 +390,7 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
//check for unknown input dimensions
if( !(mc1.dimsKnown() && mc2.dimsKnown()) ){
throw new DMLRuntimeException("Unknown dimensions
matrix-matrix binary operations: "
- + "[" + mc1.getRows() + "x" +
mc1.getCols() + " vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
+ + "[" + mc1.getRows() + "x" + mc1.getCols() +
" vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
}
//check for dimension mismatch
@@ -398,13 +400,13 @@ public abstract class BinarySPInstruction extends
ComputationSPInstruction {
&& !(mc1.getCols()==1 && mc2.getRows()==1) )
//outer colvector-rowvector
{
throw new DMLRuntimeException("Dimensions mismatch
matrix-matrix binary operations: "
- + "[" + mc1.getRows() + "x" +
mc1.getCols() + " vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
- }
+ + "[" + mc1.getRows() + "x" + mc1.getCols() +
" vs " + mc2.getRows() + "x" + mc2.getCols() + "]");
+ }
if(mc1.getBlocksize() != mc2.getBlocksize()) {
throw new DMLRuntimeException("Blocksize mismatch
matrix-matrix binary operations: "
- + "[" + mc1.getBlocksize() + "x" +
mc1.getBlocksize() + " vs " + mc2.getBlocksize() + "x" + mc2.getBlocksize() +
"]");
- }
+ + "[" + mc1.getBlocksize() + "x" +
mc1.getBlocksize() + " vs " + mc2.getBlocksize() + "x" + mc2.getBlocksize() +
"]");
+ }
}
protected void
checkTensorTensorBinaryCharacteristics(SparkExecutionContext sec)
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 70a0a38..7456cc4 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.matrix.data;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.io.Writable;
+import org.apache.sysds.api.DMLException;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
@@ -1829,6 +1830,41 @@ public class FrameBlock implements Writable, CacheBlock,
Externalizable
return fb;
}
+ public FrameBlock dropInvalid(FrameBlock schema) {
+ //sanity checks
+ if(this.getNumColumns() != schema.getNumColumns())
+ throw new DMLException("mismatch in number of columns
in frame and its schema ");
+
+ String[] schemaString = schema.getStringRowIterator().next();
// extract the schema in String array
+ for (int i = 0; i < this.getNumColumns(); i++) {
+ Array obj = this.getColumn(i);
+ for (int j = 0; j < this.getNumRows(); j++)
+ {
+ if(obj.get(j) == null)
+ continue;
+ String dataValue =
obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;
+
+ ValueType dataType = isType(dataValue);
+ if (dataType== ValueType.FP64 &&
schemaString[i].trim().equals("FP32")) {
+ double maxValue =
Double.parseDouble(dataValue);
+ if ((maxValue < (-Float.MAX_VALUE)) ||
(maxValue > Float.MAX_VALUE))
+ this.set(j,i,null);
+ }
+ else if (dataType== ValueType.INT64 &&
schemaString[i].trim().equals("INT32")) {
+ long maxValue =
Long.parseLong(dataValue);
+ if ((maxValue < Integer.MIN_VALUE) ||
(maxValue > Integer.MAX_VALUE))
+ this.set(j,i,null);
+ }
+ else if(dataType == ValueType.BOOLEAN &&
schemaString[i].trim().equals("INT32")
+ &&
((Integer.parseInt(dataValue) == 1 || Integer.parseInt(dataValue) == 0)))
+ continue;
+ else if
(!dataType.toString().equals(schemaString[i].trim()))
+ this.set(j,i,null);
+ }
+ }
+ return this;
+ }
+
public static FrameBlock mergeSchema(FrameBlock temp1, FrameBlock
temp2) {
String[] rowTemp1 = temp1.getStringRowIterator().next();
String[] rowTemp2 = temp2.getStringRowIterator().next();
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index 4c3e1c5..1185fd2 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -464,6 +464,7 @@ public class UtilFunctions
case INT32: return Integer.parseInt(in);
case INT64: return Long.parseLong(in);
case FP64: return Double.parseDouble(in);
+ case FP32: return Float.parseFloat(in);
default: throw new RuntimeException("Unsupported value
type: "+vt);
}
}
@@ -516,7 +517,8 @@ public class UtilFunctions
}
public static Object objectToObject(ValueType vt, Object in) {
- if( in instanceof Double && vt == ValueType.FP64
+ if( in instanceof Double && vt == ValueType.FP64
+ || in instanceof Float && vt == ValueType.FP32
|| in instanceof Long && vt == ValueType.INT64
|| in instanceof Integer && vt == ValueType.INT32
|| in instanceof Boolean && vt == ValueType.BOOLEAN
diff --git
a/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
b/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
new file mode 100644
index 0000000..fe84c0b
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/frame/FrameIsCorrectTypeTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.frame;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.InputInfo;
+import org.apache.sysds.runtime.matrix.data.OutputInfo;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class FrameIsCorrectTypeTest extends AutomatedTestBase
+{
+ private final static String TEST_NAME = "DropInvalid";
+ private final static String TEST_DIR = "functions/frame/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
FrameIsCorrectTypeTest.class.getSimpleName() + "/";
+
+ private final static int rows = 20;
+ private final static ValueType[] schemaStrings = {ValueType.FP64,
ValueType.STRING};
+
+ public static void init() {
+ TestUtils.clearDirectory(TEST_DATA_DIR + TEST_CLASS_DIR);
+ }
+
+ public static void cleanUp() {
+ if (TEST_CACHE_ENABLED) {
+ TestUtils.clearDirectory(TEST_DATA_DIR +
TEST_CLASS_DIR);
+ }
+ }
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"}));
+ if (TEST_CACHE_ENABLED) {
+ setOutAndExpectedDeletionDisabled(true);
+ }
+ }
+
+ @Test
+ public void testDoubleinStringCP() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3,
1, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void testDoubleinStringSpark() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3,
1, LopProperties.ExecType.SPARK);
+ }
+
+ @Test
+ public void testStringInDouble() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3,
2, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void testStringInDoubleSpark() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3,
2, LopProperties.ExecType.SPARK);
+ }
+
+ @Test
+ public void testDoubleInFloat() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5,
3, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void testDoubleInFloatSpark() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5,
3, LopProperties.ExecType.SPARK);
+ }
+
+ @Test
+ public void testLongInInt() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5,
4, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void testLongInIntSpark() {
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5,
4, LopProperties.ExecType.SPARK);
+ }
+ private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
+ int badValues, int test, LopProperties.ExecType et)
+ {
+ Types.ExecMode platformOld = setExecMode(et);
+ boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+ boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+ try {
+ getAndLoadTestConfiguration(TEST_NAME);
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-args", input("A"),
input("M"),
+ String.valueOf(rows), Integer.toString(cols),
output("B")};
+ FrameBlock frame1 = new FrameBlock(schema);
+ FrameWriter writer =
FrameWriterFactory.createFrameWriter(OutputInfo.CSVOutputInfo);
+ FrameBlock frame2 = new
FrameBlock(UtilFunctions.nCopies(cols, Types.ValueType.STRING));
+ String[] meta = new String[]{"FP64", "STRING"};
+
+ initFrameDataString(frame1); // initialize a frame with
one column
+
+ switch (test) { //Double in String
+ case 1:
+ String[] S = new String[rows];
+ Arrays.fill(S, "string_value");
+ for (int i = 0; i < badValues; i++)
+ S[i] = "0.1345672225";
+ frame1.appendColumn(S);
+ break;
+
+ case 2: { // String in double
+ double[][] D = getRandomMatrix(rows, 1,
1, 10, 0.7, 2373);
+ String[] tmp1 = new String[rows];
+ for (int i = 0; i < rows; i++)
+ tmp1[i] = (String)
UtilFunctions.doubleToObject(ValueType.STRING, D[i][0], false);
+ frame1.appendColumn(tmp1);
+ for (int i = 0; i < badValues; i++)
+ frame1.set(i, 1,
"string_value");
+ meta[meta.length - 1] = "FP64";
+ break;
+ }
+ case 3: {//Double in float
+ double[][] D = getRandomMatrix(rows, 1,
1, 10, 0.7, 2373);
+ String[] tmp1 = new String[rows];
+ for (int i = 0; i < rows; i++)
+ tmp1[i] = (String)
UtilFunctions.doubleToObject(ValueType.STRING, D[i][0], false);
+ frame1.appendColumn(tmp1);
+ for (int i = 0; i < badValues; i++)
+ frame1.set(i, 1,
"1234567890123456768E40");
+ meta[meta.length - 1] = "FP32";
+ break;
+ }
+ case 4: { // long in int
+ String[] tmp1 = new String[rows];
+ for (int i = 0; i < rows; i++)
+ tmp1[i] = String.valueOf(i);
+ for (int i = 0; i < badValues; i++)
+ tmp1[i] = "12345678910111212";
+ frame1.appendColumn(tmp1);
+ meta[meta.length - 1] = "INT32";
+ break;
+ }
+ }
+ writer.writeFrameToHDFS(
+ frame1.slice(0, rows - 1, 0, 1, new
FrameBlock()),
+ input("A"), rows, schema.length);
+
+ frame2.appendRow(meta);
+ writer.writeFrameToHDFS(frame2, input("M"), 1,
schema.length);
+ runTest(true, false, null, -1);
+ FrameBlock frameout = readDMLFrameFromHDFS("B",
InputInfo.BinaryBlockInputInfo);
+
+ //read output data and compare results
+ ArrayList<Object> data = new ArrayList<>();
+ for (int i = 0; i < frameout.getNumRows(); i++)
+ data.add(frameout.get(i, 1));
+
+ int nullNum = Math.toIntExact(data.stream().filter(s ->
s == null).count());
+ //verify output schema
+ Assert.assertEquals("Wrong result: " + nullNum + ".",
badValues, nullNum);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ rtplatform = platformOld;
+ DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+ OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag;
+ OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+ OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+ }
+ }
+ private void initFrameDataString(FrameBlock frame1) {
+ double[][] A = getRandomMatrix(rows, 1, Float.MAX_VALUE,
Double.MAX_VALUE, 0.7, 2373);
+ double[] tmp6 = new double[rows];
+ for (int i = 0; i < rows; i++)
+ tmp6[i] = (Double)
UtilFunctions.doubleToObject(ValueType.FP64, A[i][0], false);
+ frame1.appendColumn(tmp6);
+ }
+}
diff --git a/src/test/scripts/functions/frame/DropInvalid.dml
b/src/test/scripts/functions/frame/DropInvalid.dml
new file mode 100644
index 0000000..38d2436
--- /dev/null
+++ b/src/test/scripts/functions/frame/DropInvalid.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1, rows=$3, cols=$4, data_type="frame", format="csv");
+M = read($2, rows=1, cols=$4, data_type="frame", format="csv");
+R = dropInvalid(X,M);
+write(R, $5, format="binary");