This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push:
new 1a7f5ca [SYSTEMDS-392] New built-in function imputeByFD (MVI by
robust FDs)
1a7f5ca is described below
commit 1a7f5ca662597a524a052e97b5a4e6fe340938ec
Author: Shafaq Siddiqi <[email protected]>
AuthorDate: Sat May 2 23:32:47 2020 +0200
[SYSTEMDS-392] New built-in function imputeByFD (MVI by robust FDs)
Missing value imputation via robust functional dependencies
Closes #887.
---
docs/Tasks.txt | 3 +-
scripts/builtin/imputeByFD.dml | 100 ++++++++++++++
.../java/org/apache/sysds/common/Builtins.java | 1 +
.../functions/builtin/BuiltinImputeFDTest.java | 153 +++++++++++++++++++++
src/test/scripts/functions/builtin/imputeFD.dml | 21 +++
5 files changed, 277 insertions(+), 1 deletion(-)
diff --git a/docs/Tasks.txt b/docs/Tasks.txt
index 209a01c..42b2b3e 100644
--- a/docs/Tasks.txt
+++ b/docs/Tasks.txt
@@ -292,7 +292,8 @@ SYSTEMDS-380 Memory Footprint
* 371 Matrix Block Memory footprint update
SYSTEMDS-390 New Builtin Functions IV
- * 391 New GLM builtin-in function (from algorithms) OK
+ * 391 New GLM builtin function (from algorithms) OK
+ * 392 Builtin function for missing value imputation via FDs OK
Others:
* Break append instruction to cbind and rbind
diff --git a/scripts/builtin/imputeByFD.dml b/scripts/builtin/imputeByFD.dml
new file mode 100644
index 0000000..8ad523a
--- /dev/null
+++ b/scripts/builtin/imputeByFD.dml
@@ -0,0 +1,100 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Implements builtin for imputing missing values from observed values (if
exist)
+# using robust functional dependencies
+# INPUT PARAMETERS:
+#
---------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+#
---------------------------------------------------------------------------------------------
+# F String -- Data frame
+# source Integer -- source attribute to use for imputation
and error correction
+# target Integer -- attribute to be fixed
+# threshold Double -- threshold value in interval [0, 1] for
robust FDs
+#
---------------------------------------------------------------------------------------------
+
+
+#Output(s)
+#
---------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+#
---------------------------------------------------------------------------------------------
+# imputed_F String --- Frame with possible imputations
+
+
+s_imputeByFD = function(Frame[String] F, Integer sourceAttribute, Integer
targetAttribute, Double threshold)
+ return(Frame[String] imputed_F)
+{
+
+ # sanity checks
+ if( threshold < 0 | threshold > 1 )
+ stop("Stopping due to invalid input, threshold required in interval [0, 1]
found "+threshold)
+
+ if(sourceAttribute < 0 | sourceAttribute > ncol(F) | targetAttribute < 0 |
targetAttribute > ncol(F))
+ stop("Stopping due to invalid source and target")
+
+
+ # detect schema for transformation
+ schema = detectSchema(F)
+ s=""
+ for(i in 1: ncol(F)) {
+ if(as.scalar(schema[1,i]) == "STRING" | as.scalar(schema[1,i]) ==
"BOOLEAN" )
+ s = s+as.integer(i)+",";
+ }
+
+ # recode data frame
+ jspecR = "{ids:true, recode:["+s+"]}";
+ [X, M] = transformencode(target=F, spec=jspecR);
+
+ # impute missing values and fix errors
+ X[,targetAttribute] = imputeAndCorrect(X[,sourceAttribute],
X[,targetAttribute], threshold)
+
+ # getting the actual data back
+ dF = transformdecode(target=X, spec=jspecR, meta=M);
+ imputed_F = dF;
+}
+
+imputeAndCorrect = function(Matrix[Double] X, Matrix[Double] Y, Double
threshold)
+ return(Matrix[Double] imputed_Y) {
+
+ XY = cbind(X, Y)
+
+ # replace the NaN values with zero
+ XY = replace(target = XY, pattern=NaN, replacement=0)
+ missing_mask = (XY == 0)
+
+ # map the missing values to an arbitrary number (i.e., Max values + 1)
+ XY = missing_mask * (colMaxs(XY)+1) + XY
+
+ # create mapping between source and target
+ ctab = table(XY[,1], XY[,2], 1)
+
+ # remove the table column representing missing values
+ if(sum(missing_mask[,2]) > 0)
+ ctab = ctab[,1:ncol(ctab)-1]
+
+ ctab = ctab/(rowSums(ctab)) > threshold
+
+ # Get the most frequent mapped value of Y
+ ans = (ctab == rowMaxs(ctab)) * t(seq(1, ncol(ctab))) # rowIndexMax(ctab)?
+ tabMax = rowSums(ans) != (ncol(ans) * ((ncol(ans))+1)/2) # vector for
controlling max(0)
+ filled = rowMaxs(ans) * tabMax
+ imputed_Y = table(seq(1,nrow(X)), XY[,1]) %*% filled;
+}
diff --git a/src/main/java/org/apache/sysds/common/Builtins.java
b/src/main/java/org/apache/sysds/common/Builtins.java
index 4c8e1ff..b738d40 100644
--- a/src/main/java/org/apache/sysds/common/Builtins.java
+++ b/src/main/java/org/apache/sysds/common/Builtins.java
@@ -96,6 +96,7 @@ public enum Builtins {
IMG_MIRROR("img_mirror", true),
IMG_BRIGHTNESS("img_brightness", true),
IMG_CROP("img_crop", true),
+ IMPUTE_FD("imputeByFD", true),
INTERQUANTILE("interQuantile", false),
INTERSECT("intersect", true),
INVERSE("inv", "inverse", false),
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinImputeFDTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinImputeFDTest.java
new file mode 100644
index 0000000..2f094e3
--- /dev/null
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinImputeFDTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.builtin;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.io.FrameWriter;
+import org.apache.sysds.runtime.io.FrameWriterFactory;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.InputInfo;
+import org.apache.sysds.runtime.matrix.data.OutputInfo;
+import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+import java.io.IOException;
+
+public class BuiltinImputeFDTest extends AutomatedTestBase {
+
+ private final static String TEST_NAME = "imputeFD";
+ private final static String TEST_DIR = "functions/builtin/";
+ private static final String TEST_CLASS_DIR = TEST_DIR +
BuiltinImputeFDTest.class.getSimpleName() + "/";
+ private final static int rows = 11;
+ private final static int cols = 4;
+ private final static double epsilon = 0.0000000001;
+
+ private final static Types.ValueType[] schema =
{Types.ValueType.BOOLEAN, Types.ValueType.STRING, Types.ValueType.STRING,
Types.ValueType.FP64};
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"C"}));
+ }
+
+ @Test
+ public void test1() throws IOException {
+ runImpute_RFDTests(2,3, 0.6, 1, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void test2() throws IOException {
+ runImpute_RFDTests(2,3, 0.45, 2, LopProperties.ExecType.CP);
+ }
+
+ @Test
+ public void test3() throws IOException {
+ runImpute_RFDTests(2,3, 0.6, 1, LopProperties.ExecType.SPARK);
+ }
+
+ @Test
+ public void test4() throws IOException {
+ runImpute_RFDTests(2,3, 0.4, 2, LopProperties.ExecType.SPARK);
+ }
+ private void runImpute_RFDTests(int source, int target, double
threshold, int test, LopProperties.ExecType instType)
+ throws IOException
+ {
+ Types.ExecMode platformOld = setExecMode(instType);
+ try {
+ loadTestConfiguration(getTestConfiguration(TEST_NAME));
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME + ".dml";
+ programArgs = new String[] {"-args", input("A"),
String.valueOf(source),String.valueOf(target), String.valueOf(threshold),
output("B")}; //
+ //initialize the frame data.
+ FrameBlock frame1 = new FrameBlock(schema);
+ FrameWriter writer =
FrameWriterFactory.createFrameWriter(OutputInfo.CSVOutputInfo);
+ double[][] A = getRandomMatrix(rows, cols, 0, 1, 0.7,
-1);
+ initFrameDataString(frame1, A, test);
+ writer.writeFrameToHDFS(frame1.slice(0, rows - 1, 0,
schema.length - 1, new FrameBlock()),
+ input("A"), rows, schema.length);
+
+ runTest(true, false, null, -1);
+ FrameBlock frameRead = readDMLFrameFromHDFS("B",
InputInfo.BinaryBlockInputInfo);
+ FrameBlock realFrame = tureOutput(A);
+ verifyFrameData(frameRead, realFrame, schema);
+ }
+ finally {
+ rtplatform = platformOld;
+ }
+ }
+
+ private static void initFrameDataString(FrameBlock frame1, double[][]
data, int test) {
+ boolean[] b = new boolean[rows];
+ long[] l = new long[rows];
+ String[] s1 = null, s2 = null;
+ for (int i = 0; i < rows; i++) {
+ data[i][1] = (b[i] = (Boolean)
UtilFunctions.doubleToObject(Types.ValueType.BOOLEAN, data[i][1], false)) ? 1 :
0;
+ l[i] = (Long)
UtilFunctions.doubleToObject(Types.ValueType.INT64, data[i][2], false);
+ }
+ switch (test)
+ {
+ case 1:
+ s1 = new String[] {"TU-Graz", "TU-Graz",
"TU-Graz", "IIT", "IIT", "IIT", "IIT", "SIBA", "SIBA", "SIBA", "TU-Wien"};
+ s2 = new String[] {"Austria", "Austria",
"Austria", "India", "IIT", "India", "India", "Pakistan", "Pakistan", "Austria",
"Austria"};
+ break;
+ case 2:
+ s1 = new String[] {"TU-Graz", "TU-Graz",
"TU-Graz", "IIT", "IIT", "IIT", "IIT", "SIBA", "SIBA", "SIBA", "TU-Wien"};
+ s2 = new String[] {"Austria", "Austria",
"Austria", "India", "IIT", "In","India", "Pakistan", "Pakistan",
null,"Austria"};
+ break;
+ }
+
+ frame1.appendColumn(b);
+ frame1.appendColumn(s1);
+ frame1.appendColumn(s2);
+ frame1.appendColumn(l);
+ }
+
+ private static FrameBlock tureOutput(double[][] data) {
+ FrameBlock frame1 = new FrameBlock(schema);
+ boolean[] b = new boolean[rows];
+ String[] s1 = {"TU-Graz", "TU-Graz", "TU-Graz", "IIT", "IIT",
"IIT","IIT", "SIBA", "SIBA", "SIBA", "TU-Wien"};
+ String[] s2 = {"Austria", "Austria", "Austria", "India",
"India", "India","India", "Pakistan", "Pakistan", "Pakistan", "Austria"};
+ long[] l = new long[rows];
+ for (int i = 0; i < rows; i++) {
+ data[i][1] = (b[i] = (Boolean)
UtilFunctions.doubleToObject(Types.ValueType.BOOLEAN, data[i][1], false)) ? 1 :
0;
+ l[i] = (Long)
UtilFunctions.doubleToObject(Types.ValueType.INT64, data[i][2], false);
+ }
+ frame1.appendColumn(b);
+ frame1.appendColumn(s1);
+ frame1.appendColumn(s2);
+ frame1.appendColumn(l);
+ return frame1;
+ }
+
+ private static void verifyFrameData(FrameBlock frame1, FrameBlock
frame2, Types.ValueType[] schema) {
+ for (int i = 0; i < frame1.getNumRows(); i++)
+ for (int j = 0; j < frame1.getNumColumns(); j++) {
+ Object val1 =
UtilFunctions.stringToObject(schema[j],
UtilFunctions.objectToString(frame1.get(i, j)));
+ Object val2 =
UtilFunctions.stringToObject(schema[j],
UtilFunctions.objectToString(frame2.get(i, j)));
+ if (TestUtils.compareToR(schema[j], val1, val2,
epsilon) != 0)
+ Assert.fail("The DML data for cell (" +
i + "," + j + ") is " + val1 + ", not same as the expected value " + val2);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/builtin/imputeFD.dml
b/src/test/scripts/functions/builtin/imputeFD.dml
new file mode 100644
index 0000000..9325562
--- /dev/null
+++ b/src/test/scripts/functions/builtin/imputeFD.dml
@@ -0,0 +1,21 @@
+#-------------------------------------------------------------
+#
+# Copyright 2020 Graz University of Technology
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1, data_type="frame", format="csv", header=FALSE);
+Y = imputeByFD(X, $2, $3, $4);
+write(Y, $5, format="binary")