This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch research/timeseries-master-data in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b00b77b75b0217d371b67aa90465880e35519b3f Author: Husaimawx <[email protected]> AuthorDate: Thu Aug 4 10:38:10 2022 +0800 Research/timeseries master data (#6890) * add new udf function(MasterRepair) * add docs of new udf function(MasterRepair) --- docs/UserGuide/UDF-Library/Data-Repairing.md | 61 ++++ docs/zh/UserGuide/UDF-Library/Data-Repairing.md | 62 ++++ .../iotdb/library/drepair/UDTFMasterRepair.java | 79 +++++ .../iotdb/library/drepair/util/KDTreeUtil.java | 336 +++++++++++++++++++++ .../library/drepair/util/MasterRepairUtil.java | 260 ++++++++++++++++ 5 files changed, 798 insertions(+) diff --git a/docs/UserGuide/UDF-Library/Data-Repairing.md b/docs/UserGuide/UDF-Library/Data-Repairing.md index a6832fa244..2c1740e0c7 100644 --- a/docs/UserGuide/UDF-Library/Data-Repairing.md +++ b/docs/UserGuide/UDF-Library/Data-Repairing.md @@ -352,4 +352,65 @@ Output series: |2020-01-01T00:00:28.000+08:00| 126.0| |2020-01-01T00:00:30.000+08:00| 128.0| +-----------------------------+-------------------------------------------------+ +``` + +## MasterRepair + +### Usage + +This function is used to clean time series with master data. + +**Name**: MasterRepair +**Input Series:** Support multiple input series. The types are are in INT32 / INT64 / FLOAT / DOUBLE. + +**Parameters:** + ++ `omega`: The window size. It is a non-negative integer whose unit is millisecond. By default, it will be estimated according to the distances of two tuples with various time differences. ++ `eta`: The distance threshold. It is a positive number. By default, it will be estimated according to the distance distribution of tuples in windows. ++ `k`: The number of neighbors in master data. It is a positive integer. By default, it will be estimated according to the tuple dis- tance of the k-th nearest neighbor in the master data. ++ `output_column`: The repaired column to output, defaults to 1 which means output the repair result of the first column. + +**Output Series:** Output a single series. The type is the same as the input. This series is the input after repairing. + +### Examples + +Input series: + +``` ++-----------------------------+------------+------------+------------+------------+------------+------------+ +| Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3| ++-----------------------------+------------+------------+------------+------------+------------+------------+ +|2021-07-01T12:00:01.000+08:00| 1704| 1154.55| 0.195| 1704| 1154.55| 0.195| +|2021-07-01T12:00:02.000+08:00| 1702| 1152.30| 0.193| 1702| 1152.30| 0.193| +|2021-07-01T12:00:03.000+08:00| 1702| 1148.65| 0.192| 1702| 1148.65| 0.192| +|2021-07-01T12:00:04.000+08:00| 1701| 1145.20| 0.194| 1701| 1145.20| 0.194| +|2021-07-01T12:00:07.000+08:00| 1703| 1150.55| 0.195| 1703| 1150.55| 0.195| +|2021-07-01T12:00:08.000+08:00| 1694| 1151.55| 0.193| 1704| 1151.55| 0.193| +|2021-07-01T12:01:09.000+08:00| 1705| 1153.55| 0.194| 1705| 1153.55| 0.194| +|2021-07-01T12:01:10.000+08:00| 1706| 1152.30| 0.190| 1706| 1152.30| 0.190| ++-----------------------------+------------+------------+------------+------------+------------+------------+ +``` + +SQL for query: + +```sql +select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test +``` + +Output series: + + +``` ++-----------------------------+-------------------------------------------------------------------------------------------+ +| Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)| ++-----------------------------+-------------------------------------------------------------------------------------------+ +|2021-07-01T12:00:01.000+08:00| 1704| +|2021-07-01T12:00:02.000+08:00| 1702| +|2021-07-01T12:00:03.000+08:00| 1702| +|2021-07-01T12:00:04.000+08:00| 1701| +|2021-07-01T12:00:07.000+08:00| 1703| +|2021-07-01T12:00:08.000+08:00| 1704| +|2021-07-01T12:01:09.000+08:00| 1705| +|2021-07-01T12:01:10.000+08:00| 1706| ++-----------------------------+-------------------------------------------------------------------------------------------+ ``` \ No newline at end of file diff --git a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md index e75d7c1782..c5d92fbfdd 100644 --- a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md +++ b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md @@ -343,4 +343,66 @@ select valuerepair(s1,'method'='LsGreedy') from root.test.d2 |2020-01-01T00:00:28.000+08:00| 126.0| |2020-01-01T00:00:30.000+08:00| 128.0| +-----------------------------+-------------------------------------------------+ +``` + +## MasterRepair + +### 函数简介 + +本函数实现基于主数据的时间序列数据修复。 + +**函数名:**MasterRepair + +**输入序列:** 支持多个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。 + +**参数:** + +- `omega`:算法窗口大小,非负整数(单位为毫秒), 在缺省情况下,算法根据不同时间差下的两个元组距离自动估计该参数。 +- `eta`:算法距离阈值,正数, 在缺省情况下,算法根据窗口中元组的距离分布自动估计该参数。 +- `k`:主数据中的近邻数量,正整数, 在缺省情况下,算法根据主数据中的k个近邻的元组距离自动估计该参数。 +- `output_column`:输出列的序号,默认输出第一列的修复结果。 + +**输出序列:**输出单个序列,类型与输入数据中对应列的类型相同,序列为输入列修复后的结果。 + +### 使用示例 + +输入序列: + +``` ++-----------------------------+------------+------------+------------+------------+------------+------------+ +| Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3| ++-----------------------------+------------+------------+------------+------------+------------+------------+ +|2021-07-01T12:00:01.000+08:00| 1704| 1154.55| 0.195| 1704| 1154.55| 0.195| +|2021-07-01T12:00:02.000+08:00| 1702| 1152.30| 0.193| 1702| 1152.30| 0.193| +|2021-07-01T12:00:03.000+08:00| 1702| 1148.65| 0.192| 1702| 1148.65| 0.192| +|2021-07-01T12:00:04.000+08:00| 1701| 1145.20| 0.194| 1701| 1145.20| 0.194| +|2021-07-01T12:00:07.000+08:00| 1703| 1150.55| 0.195| 1703| 1150.55| 0.195| +|2021-07-01T12:00:08.000+08:00| 1694| 1151.55| 0.193| 1704| 1151.55| 0.193| +|2021-07-01T12:01:09.000+08:00| 1705| 1153.55| 0.194| 1705| 1153.55| 0.194| +|2021-07-01T12:01:10.000+08:00| 1706| 1152.30| 0.190| 1706| 1152.30| 0.190| ++-----------------------------+------------+------------+------------+------------+------------+------------+ +``` + +用于查询的 SQL 语句: + +```sql +select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test +``` + +输出序列: + + +``` ++-----------------------------+-------------------------------------------------------------------------------------------+ +| Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)| ++-----------------------------+-------------------------------------------------------------------------------------------+ +|2021-07-01T12:00:01.000+08:00| 1704| +|2021-07-01T12:00:02.000+08:00| 1702| +|2021-07-01T12:00:03.000+08:00| 1702| +|2021-07-01T12:00:04.000+08:00| 1701| +|2021-07-01T12:00:07.000+08:00| 1703| +|2021-07-01T12:00:08.000+08:00| 1704| +|2021-07-01T12:01:09.000+08:00| 1705| +|2021-07-01T12:01:10.000+08:00| 1706| ++-----------------------------+-------------------------------------------------------------------------------------------+ ``` \ No newline at end of file diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java new file mode 100644 index 0000000000..07e7eae093 --- /dev/null +++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java @@ -0,0 +1,79 @@ +package org.apache.iotdb.library.drepair; + +import org.apache.iotdb.library.drepair.util.MasterRepairUtil; +import org.apache.iotdb.udf.api.UDTF; +import org.apache.iotdb.udf.api.access.Row; +import org.apache.iotdb.udf.api.collector.PointCollector; +import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; +import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; +import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; +import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy; +import org.apache.iotdb.udf.api.type.Type; + +import java.util.ArrayList; + +public class UDTFMasterRepair implements UDTF { + private MasterRepairUtil masterRepairUtil; + private int output_column; + + @Override + public void validate(UDFParameterValidator validator) throws Exception { + for (int i = 0; i < validator.getParameters().getAttributes().size(); i++) { + validator.validateInputSeriesDataType(i, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64); + } + if (validator.getParameters().hasAttribute("omega")) { + validator.validate( + omega -> (int) omega >= 0, + "Parameter omega should be non-negative.", + validator.getParameters().getInt("omega")); + } + if (validator.getParameters().hasAttribute("eta")) { + validator.validate( + eta -> (double) eta > 0, + "Parameter eta should be larger than 0.", + validator.getParameters().getDouble("eta")); + } + if (validator.getParameters().hasAttribute("k")) { + validator.validate( + k -> (int) k > 0, + "Parameter k should be a positive integer.", + validator.getParameters().getInt("k")); + } + if (validator.getParameters().hasAttribute("output_column")) { + validator.validate( + output_column -> (int) output_column > 0, + "Parameter output_column should be a positive integer.", + validator.getParameters().getInt("output_column")); + } + } + + @Override + public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) + throws Exception { + configurations.setAccessStrategy(new RowByRowAccessStrategy()); + configurations.setOutputDataType(Type.DOUBLE); + int columnCnt = parameters.getDataTypes().size() / 2; + long omega = parameters.getLongOrDefault("omega", -1); + double eta = parameters.getDoubleOrDefault("eta", Double.NaN); + int k = parameters.getIntOrDefault("k", -1); + masterRepairUtil = new MasterRepairUtil(columnCnt, omega, eta, k); + output_column = parameters.getIntOrDefault("output_column", 1); + } + + @Override + public void transform(Row row, PointCollector collector) throws Exception { + if (!masterRepairUtil.isNullRow(row)) { + masterRepairUtil.addRow(row); + } + } + + @Override + public void terminate(PointCollector collector) throws Exception { + masterRepairUtil.repair(); + ArrayList<Long> times = masterRepairUtil.getTime(); + ArrayList<Double> column = masterRepairUtil.getCleanResultColumn(this.output_column); + for (int i = 0; i < column.size(); i++) { + collector.putDouble(times.get(i), column.get(i)); + } + } +} diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java new file mode 100644 index 0000000000..49f2c48da0 --- /dev/null +++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java @@ -0,0 +1,336 @@ +package org.apache.iotdb.library.drepair.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Stack; + +import static java.lang.Math.min; +import static java.lang.Math.sqrt; + +public class KDTreeUtil { + private Node kdtree; + + private static class Node { + // 分割的维度 + int partitiondimension; + // 分割的值 + double partitionValue; + // 如果为非叶子节点,该属性为空 + // 否则为数据 + ArrayList<Double> value; + // 是否为叶子 + boolean isLeaf = false; + // 左树 + Node left; + // 右树 + Node right; + // 每个维度的最小值 + ArrayList<Double> min; + // 每个维度的最大值 + ArrayList<Double> max; + } + + public static KDTreeUtil build(ArrayList<ArrayList<Double>> input, int dimension) { + KDTreeUtil tree = new KDTreeUtil(); + tree.kdtree = new Node(); + tree.buildDetail(tree.kdtree, input, dimension); + return tree; + } + + private void buildDetail(Node node, ArrayList<ArrayList<Double>> data, int dimensions) { + if (data.size() == 0) { + return; + } + if (data.size() == 1) { + node.isLeaf = true; + node.value = data.get(0); + return; + } + // 选择方差最大的维度 + node.partitiondimension = -1; + double var = -1; + double tmpvar; + for (int i = 0; i < dimensions; i++) { + tmpvar = UtilZ.variance(data, i); + if (tmpvar > var) { + var = tmpvar; + node.partitiondimension = i; + } + } + // 如果方差=0,表示所有数据都相同,判定为叶子节点 + if (var == 0d) { + node.isLeaf = true; + node.value = data.get(0); + return; + } + + // 选择分割的值 + node.partitionValue = UtilZ.median(data, node.partitiondimension); + + ArrayList<ArrayList<Double>> maxmin = UtilZ.maxmin(data, dimensions); + node.min = maxmin.get(0); + node.max = maxmin.get(1); + + ArrayList<ArrayList<Double>> left = new ArrayList<>(); + ArrayList<ArrayList<Double>> right = new ArrayList<>(); + + for (ArrayList<Double> d : data) { + if (d.get(node.partitiondimension) < node.partitionValue) { + left.add(d); + } else if (d.get(node.partitiondimension) > node.partitionValue) { + right.add(d); + } + } + for (ArrayList<Double> d : data) { + if (d.get(node.partitiondimension) == node.partitionValue) { + if (left.size() == 0) { + left.add(d); + } else { + right.add(d); + } + } + } + + Node leftnode = new Node(); + Node rightnode = new Node(); + node.left = leftnode; + node.right = rightnode; + buildDetail(leftnode, left, dimensions); + buildDetail(rightnode, right, dimensions); + } + + public ArrayList<Double> query(ArrayList<Double> input, double[] std) { + Node node = kdtree; + Stack<Node> stack = new Stack<>(); + while (!node.isLeaf) { + if (input.get(node.partitiondimension) < node.partitionValue) { + stack.add(node.right); + node = node.left; + } else { + stack.push(node.left); + node = node.right; + } + } + + double distance = UtilZ.distance(input, node.value, std); + ArrayList<Double> nearest = queryRec(input, distance, stack, std); + return nearest == null ? node.value : nearest; + } + + public ArrayList<Double> queryRec( + ArrayList<Double> input, double distance, Stack<Node> stack, double[] std) { + ArrayList<Double> nearest = null; + Node node; + double tdis; + while (stack.size() != 0) { + node = stack.pop(); + if (node.isLeaf) { + tdis = UtilZ.distance(input, node.value, std); + if (tdis < distance) { + distance = tdis; + nearest = node.value; + } + } else { + double mindistance = UtilZ.mindistance(input, node.max, node.min, std); + if (mindistance < distance) { + while (!node.isLeaf) { + if (input.get(node.partitiondimension) < node.partitionValue) { + stack.add(node.right); + node = node.left; + } else { + stack.push(node.left); + node = node.right; + } + } + tdis = UtilZ.distance(input, node.value, std); + if (tdis < distance) { + distance = tdis; + nearest = node.value; + } + } + } + } + return nearest; + } + + public ArrayList<ArrayList<Double>> queryRecKNN( + ArrayList<Double> input, double distance, Stack<Node> stack, double[] std) { + ArrayList<ArrayList<Double>> nearest = new ArrayList<>(); + Node node; + double tdis; + while (stack.size() != 0) { + node = stack.pop(); + if (node.isLeaf) { + tdis = UtilZ.distance(input, node.value, std); + if (tdis < distance) { + distance = tdis; + nearest.add(node.value); + } + } else { + /* + * 得到该节点代表的超矩形中点到查找点的最小距离mindistance + * 如果mindistance<distance表示有可能在这个节点的子节点上找到更近的点 + * 否则不可能找到 + */ + double mindistance = UtilZ.mindistance(input, node.max, node.min, std); + if (mindistance < distance) { + while (!node.isLeaf) { + if (input.get(node.partitiondimension) < node.partitionValue) { + stack.add(node.right); + node = node.left; + } else { + stack.push(node.left); + node = node.right; + } + } + tdis = UtilZ.distance(input, node.value, std); + if (tdis < distance) { + distance = tdis; + nearest.add(node.value); + } + } + } + } + return nearest; + } + + public ArrayList<Double> findNearest( + ArrayList<Double> input, ArrayList<ArrayList<Double>> nearest, double[] std) { + double min_dis = Double.MAX_VALUE; + int min_index = 0; + for (int i = 0; i < nearest.size(); i++) { + double dis = UtilZ.distance(input, nearest.get(i), std); + if (dis < min_dis) { + min_dis = dis; + min_index = i; + } + } + ArrayList<Double> nt = nearest.get(min_index); + nearest.remove(min_index); + return nt; + } + + public ArrayList<ArrayList<Double>> queryKNN(ArrayList<Double> input, int k, double[] std) { + ArrayList<ArrayList<Double>> kNearest = new ArrayList<>(); + Node node = kdtree; + Stack<Node> stack = new Stack<>(); + while (!node.isLeaf) { + if (input.get(node.partitiondimension) < node.partitionValue) { + stack.add(node.right); + node = node.left; + } else { + stack.push(node.left); + node = node.right; + } + } + double distance = UtilZ.distance(input, node.value, std); + ArrayList<ArrayList<Double>> nearest = queryRecKNN(input, distance, stack, std); + for (int i = 0; i < min(k, nearest.size()); i++) { + kNearest.add(findNearest(input, nearest, std)); + } + if (kNearest.size() == 0) { + kNearest.add(node.value); + } + for (ArrayList<Double> doubles : kNearest) { + UtilZ.distance(doubles, input, std); + } + return kNearest; + } + + public static class TupleWithDistance implements Comparable<TupleWithDistance> { + private final Double distance; + private final ArrayList<Double> tuple; + + public TupleWithDistance(Double distance, ArrayList<Double> tuple) { + this.distance = distance; + this.tuple = tuple; + } + + @Override + public int compareTo(TupleWithDistance t) { + return this.distance.compareTo(t.distance); + } + + public Double getDistance() { + return distance; + } + + public ArrayList<Double> getTuple() { + return tuple; + } + } + + private static class UtilZ { + + static double variance(ArrayList<ArrayList<Double>> data, int dimension) { + double sum = 0d; + for (ArrayList<Double> d : data) { + sum += d.get(dimension); + } + double avg = sum / data.size(); + double ans = 0d; + for (ArrayList<Double> d : data) { + double temp = d.get(dimension) - avg; + ans += temp * temp; + } + return ans / data.size(); + } + + static double median(ArrayList<ArrayList<Double>> data, int dimension) { + ArrayList<Double> d = new ArrayList<>(); + for (ArrayList<Double> k : data) { + d.add(k.get(dimension)); + } + Collections.sort(d); + int pos = d.size() / 2; + return d.get(pos); + } + + static ArrayList<ArrayList<Double>> maxmin(ArrayList<ArrayList<Double>> data, int dimensions) { + ArrayList<ArrayList<Double>> mm = new ArrayList<>(); + ArrayList<Double> min_v = new ArrayList<>(); + ArrayList<Double> max_v = new ArrayList<>(); + // 初始化 第一行为min,第二行为max + for (int i = 0; i < dimensions; i++) { + double min_temp = Double.MAX_VALUE; + double max_temp = Double.MIN_VALUE; + for (int j = 1; j < data.size(); j++) { + ArrayList<Double> d = data.get(j); + if (d.get(i) < min_temp) { + min_temp = d.get(i); + } else if (d.get(i) > max_temp) { + max_temp = d.get(i); + } + } + min_v.add(min_temp); + max_v.add(max_temp); + } + mm.add(min_v); + mm.add(max_v); + return mm; + } + + static double distance(ArrayList<Double> a, ArrayList<Double> b, double[] std) { + double sum = 0d; + for (int i = 0; i < a.size(); i++) { + if (a.get(i) != null && b.get(i) != null) + sum += Math.pow((a.get(i) - b.get(i)) / std[i], 2); + } + sum = sqrt(sum); + return sum; + } + + static double mindistance( + ArrayList<Double> a, ArrayList<Double> max, ArrayList<Double> min, double[] std) { + double sum = 0d; + for (int i = 0; i < a.size(); i++) { + if (a.get(i) > max.get(i)) sum += Math.pow((a.get(i) - max.get(i)) / std[i], 2); + else if (a.get(i) < min.get(i)) { + sum += Math.pow((min.get(i) - a.get(i)) / std[i], 2); + } + } + sum = sqrt(sum); + return sum; + } + } +} diff --git a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java new file mode 100644 index 0000000000..f99a81a731 --- /dev/null +++ b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java @@ -0,0 +1,260 @@ +package org.apache.iotdb.library.drepair.util; + +import org.apache.iotdb.library.util.Util; +import org.apache.iotdb.udf.api.access.Row; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; + +public class MasterRepairUtil { + private final ArrayList<ArrayList<Double>> td = new ArrayList<>(); + private final ArrayList<ArrayList<Double>> td_cleaned = new ArrayList<>(); + private final ArrayList<ArrayList<Double>> md = new ArrayList<>(); + private final ArrayList<Long> td_time = new ArrayList<>(); + private final int columnCnt; + private long omega; + private Double eta; + private int k; + private double[] std; + private KDTreeUtil kdTreeUtil; + + public MasterRepairUtil(int columnCnt, long omega, double eta, int k) throws Exception { + this.columnCnt = columnCnt; + this.omega = omega; + this.eta = eta; + this.k = k; + } + + public boolean isNullRow(Row row) { + boolean flag = true; + for (int i = 0; i < row.size(); i++) { + if (!row.isNull(i)) { + flag = false; + break; + } + } + return flag; + } + + public void addRow(Row row) throws Exception { + ArrayList<Double> tt = new ArrayList<>(); // time-series tuple + boolean containsNotNull = false; + for (int i = 0; i < this.columnCnt; i++) { + if (!row.isNull(i)) { + containsNotNull = true; + BigDecimal bd = BigDecimal.valueOf(Util.getValueAsDouble(row, i)); + tt.add(bd.doubleValue()); + } else { + tt.add(null); + } + } + if (containsNotNull) { + td.add(tt); + td_time.add(row.getTime()); + } + + ArrayList<Double> mt = new ArrayList<>(); // master tuple + containsNotNull = false; + for (int i = this.columnCnt; i < row.size(); i++) { + if (!row.isNull(i)) { + containsNotNull = true; + BigDecimal bd = BigDecimal.valueOf(Util.getValueAsDouble(row, i)); + mt.add(bd.doubleValue()); + } else { + mt.add(null); + } + } + if (containsNotNull) { + md.add(mt); + } + } + + public void buildKDTree() { + this.kdTreeUtil = KDTreeUtil.build(md, this.columnCnt); + } + + public ArrayList<Double> getCleanResultColumn(int columnPos) { + ArrayList<Double> column = new ArrayList<>(); + for (ArrayList<Double> tuple : this.td_cleaned) { + column.add(tuple.get(columnPos - 1)); + } + return column; + } + + public ArrayList<Long> getTime() { + return td_time; + } + + public double get_tm_distance(ArrayList<Double> t_tuple, ArrayList<Double> m_tuple) { + double distance = 0d; + for (int pos = 0; pos < columnCnt; pos++) { + double temp = t_tuple.get(pos) - m_tuple.get(pos); + temp = temp / std[pos]; + distance += temp * temp; + } + distance = Math.sqrt(distance); + return distance; + } + + public ArrayList<Integer> cal_W(int i) { + ArrayList<Integer> W_i = new ArrayList<>(); + for (int l = i - 1; l >= 0; l--) { + if (this.td_time.get(i) <= this.td_time.get(l) + omega) { + W_i.add(l); + } + } + return W_i; + } + + public ArrayList<ArrayList<Double>> cal_C(int i, ArrayList<Integer> W_i) { + ArrayList<ArrayList<Double>> C_i = new ArrayList<>(); + if (W_i.size() == 0) { + C_i.add(this.kdTreeUtil.query(this.td.get(i), std)); + } else { + C_i.addAll(this.kdTreeUtil.queryKNN(this.td.get(i), k, std)); + for (Integer integer : W_i) { + C_i.addAll(this.kdTreeUtil.queryKNN(this.td_cleaned.get(integer), k, std)); + } + } + return C_i; + } + + public void master_repair() { + for (int i = 0; i < this.td.size(); i++) { + ArrayList<Double> tuple = this.td.get(i); + ArrayList<Integer> W_i = cal_W(i); + ArrayList<ArrayList<Double>> C_i = this.cal_C(i, W_i); + double min_dis = Double.MAX_VALUE; + ArrayList<Double> repair_tuple = new ArrayList<>(); + for (ArrayList<Double> c_i : C_i) { + boolean smooth = true; + for (Integer w_i : W_i) { + ArrayList<Double> w_is = td_cleaned.get(w_i); + if (get_tm_distance(c_i, w_is) > eta) { + smooth = false; + break; + } + } + if (smooth) { + double dis = get_tm_distance(c_i, tuple); + if (dis < min_dis) { + min_dis = dis; + repair_tuple = c_i; + } + } + } + this.td_cleaned.add(repair_tuple); + } + } + + public void set_parameters() { + if (omega == -1) { + ArrayList<Long> intervals = getIntervals(); + Collections.sort(intervals); + long interval = intervals.get(intervals.size() / 2); + omega = interval * 10; + } + if (Double.isNaN(eta)) { + ArrayList<Double> distance_list = new ArrayList<>(); + for (int i = 1; i < this.td.size(); i++) { + for (int l = i - 1; l >= 0; l--) { + if (this.td_time.get(i) <= this.td_time.get(l) + omega) { + distance_list.add(get_tm_distance(this.td.get(i), this.td.get(l))); + } else break; + } + } + Collections.sort(distance_list); + eta = distance_list.get((int) (distance_list.size() * 0.9973)); + } + if (k == -1) { + for (int temp_k = 2; temp_k <= 5; temp_k++) { + ArrayList<Double> distance_list = new ArrayList<>(); + for (ArrayList<Double> tuple : this.td) { + ArrayList<ArrayList<Double>> neighbors = this.kdTreeUtil.queryKNN(tuple, temp_k, std); + for (ArrayList<Double> neighbor : neighbors) { + distance_list.add(get_tm_distance(tuple, neighbor)); + } + } + Collections.sort(distance_list); + if (distance_list.get((int) (distance_list.size() * 0.9)) > eta) { + k = temp_k; + break; + } + } + if (k == -1) { + k = 5; + } + } + } + + private double varianceImperative(double[] value) { + double average = 0.0; + int cnt = 0; + for (double p : value) { + if (!Double.isNaN(p)) { + cnt += 1; + average += p; + } + } + if (cnt == 0) { + return 0d; + } + average /= cnt; + + double variance = 0.0; + for (double p : value) { + if (!Double.isNaN(p)) { + variance += (p - average) * (p - average); + } + } + return variance / cnt; + } + + private double[] getColumn(int pos) { + double[] column = new double[this.td.size()]; + for (int i = 0; i < this.td.size(); i++) { + column[i] = this.td.get(i).get(pos); + } + return column; + } + + public void call_std() { + this.std = new double[this.columnCnt]; + for (int i = 0; i < this.columnCnt; i++) { + std[i] = Math.sqrt(varianceImperative(getColumn(i))); + } + } + + public void repair() { + fillNullValue(); + buildKDTree(); + call_std(); + set_parameters(); + System.out.println(this.omega); + System.out.println(this.eta); + System.out.println(this.k); + master_repair(); + } + + public ArrayList<Long> getIntervals() { + ArrayList<Long> intervals = new ArrayList<>(); + for (int i = 1; i < this.td_time.size(); i++) { + intervals.add(this.td_time.get(i) - this.td_time.get(i - 1)); + } + return intervals; + } + + public void fillNullValue() { + for (int i = 0; i < columnCnt; i++) { + double temp = this.td.get(0).get(i); + for (ArrayList<Double> arrayList : this.td) { + if (arrayList.get(i) == null) { + arrayList.set(i, temp); + } else { + temp = arrayList.get(i); + } + } + } + } +}
