This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch research/timeseries-master-data
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b00b77b75b0217d371b67aa90465880e35519b3f
Author: Husaimawx <[email protected]>
AuthorDate: Thu Aug 4 10:38:10 2022 +0800

    Research/timeseries master data (#6890)
    
    * add new udf function(MasterRepair)
    
    * add docs of new udf function(MasterRepair)
---
 docs/UserGuide/UDF-Library/Data-Repairing.md       |  61 ++++
 docs/zh/UserGuide/UDF-Library/Data-Repairing.md    |  62 ++++
 .../iotdb/library/drepair/UDTFMasterRepair.java    |  79 +++++
 .../iotdb/library/drepair/util/KDTreeUtil.java     | 336 +++++++++++++++++++++
 .../library/drepair/util/MasterRepairUtil.java     | 260 ++++++++++++++++
 5 files changed, 798 insertions(+)

diff --git a/docs/UserGuide/UDF-Library/Data-Repairing.md 
b/docs/UserGuide/UDF-Library/Data-Repairing.md
index a6832fa244..2c1740e0c7 100644
--- a/docs/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/UserGuide/UDF-Library/Data-Repairing.md
@@ -352,4 +352,65 @@ Output series:
 |2020-01-01T00:00:28.000+08:00|                                            
126.0|
 |2020-01-01T00:00:30.000+08:00|                                            
128.0|
 
+-----------------------------+-------------------------------------------------+
+```
+
+## MasterRepair
+
+### Usage
+
+This function is used to clean time series with master data.
+
+**Name**: MasterRepair
+**Input Series:** Support multiple input series. The types are are in INT32 / 
INT64 / FLOAT / DOUBLE.
+
+**Parameters:**
+
++ `omega`: The window size. It is a non-negative integer whose unit is 
millisecond. By default, it will be estimated according to the distances of two 
tuples with various time differences.
++ `eta`: The distance threshold. It is a positive number. By default, it will 
be estimated according to the distance distribution of tuples in windows.
++ `k`: The number of neighbors in master data. It is a positive integer. By 
default, it will be estimated according to the tuple dis- tance of the k-th 
nearest neighbor in the master data.
++ `output_column`: The repaired column to output, defaults to 1 which means 
output the repair result of the first column.
+
+**Output Series:** Output a single series. The type is the same as the input. 
This series is the input after repairing.
+
+### Examples
+
+Input series:
+
+```
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|                         
Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|2021-07-01T12:00:01.000+08:00|        1704|     1154.55|       0.195|        
1704|     1154.55|       0.195|
+|2021-07-01T12:00:02.000+08:00|        1702|     1152.30|       0.193|        
1702|     1152.30|       0.193|
+|2021-07-01T12:00:03.000+08:00|        1702|     1148.65|       0.192|        
1702|     1148.65|       0.192|
+|2021-07-01T12:00:04.000+08:00|        1701|     1145.20|       0.194|        
1701|     1145.20|       0.194|
+|2021-07-01T12:00:07.000+08:00|        1703|     1150.55|       0.195|        
1703|     1150.55|       0.195|
+|2021-07-01T12:00:08.000+08:00|        1694|     1151.55|       0.193|        
1704|     1151.55|       0.193|
+|2021-07-01T12:01:09.000+08:00|        1705|     1153.55|       0.194|        
1705|     1153.55|       0.194|
+|2021-07-01T12:01:10.000+08:00|        1706|     1152.30|       0.190|        
1706|     1152.30|       0.190|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+```
+
+SQL for query:
+
+```sql
+select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
+```
+
+Output series:
+
+
+```
++-----------------------------+-------------------------------------------------------------------------------------------+
+|                         
Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
++-----------------------------+-------------------------------------------------------------------------------------------+
+|2021-07-01T12:00:01.000+08:00|                                                
                                       1704|
+|2021-07-01T12:00:02.000+08:00|                                                
                                       1702|
+|2021-07-01T12:00:03.000+08:00|                                                
                                       1702|
+|2021-07-01T12:00:04.000+08:00|                                                
                                       1701|
+|2021-07-01T12:00:07.000+08:00|                                                
                                       1703|
+|2021-07-01T12:00:08.000+08:00|                                                
                                       1704|
+|2021-07-01T12:01:09.000+08:00|                                                
                                       1705|
+|2021-07-01T12:01:10.000+08:00|                                                
                                       1706|
++-----------------------------+-------------------------------------------------------------------------------------------+
 ```
\ No newline at end of file
diff --git a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md 
b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
index e75d7c1782..c5d92fbfdd 100644
--- a/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
+++ b/docs/zh/UserGuide/UDF-Library/Data-Repairing.md
@@ -343,4 +343,66 @@ select valuerepair(s1,'method'='LsGreedy') from 
root.test.d2
 |2020-01-01T00:00:28.000+08:00|                                            
126.0|
 |2020-01-01T00:00:30.000+08:00|                                            
128.0|
 
+-----------------------------+-------------------------------------------------+
+```
+
+## MasterRepair
+
+### 函数简介
+
+本函数实现基于主数据的时间序列数据修复。
+
+**函数名:**MasterRepair
+
+**输入序列:** 支持多个输入序列,类型为 INT32 / INT64 / FLOAT / DOUBLE。
+
+**参数:**
+
+- `omega`:算法窗口大小,非负整数(单位为毫秒), 在缺省情况下,算法根据不同时间差下的两个元组距离自动估计该参数。
+- `eta`:算法距离阈值,正数, 在缺省情况下,算法根据窗口中元组的距离分布自动估计该参数。
+- `k`:主数据中的近邻数量,正整数, 在缺省情况下,算法根据主数据中的k个近邻的元组距离自动估计该参数。
+- `output_column`:输出列的序号,默认输出第一列的修复结果。
+
+**输出序列:**输出单个序列,类型与输入数据中对应列的类型相同,序列为输入列修复后的结果。
+
+### 使用示例
+
+输入序列:
+
+```
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|                         
Time|root.test.t1|root.test.t2|root.test.t3|root.test.m1|root.test.m2|root.test.m3|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+|2021-07-01T12:00:01.000+08:00|        1704|     1154.55|       0.195|        
1704|     1154.55|       0.195|
+|2021-07-01T12:00:02.000+08:00|        1702|     1152.30|       0.193|        
1702|     1152.30|       0.193|
+|2021-07-01T12:00:03.000+08:00|        1702|     1148.65|       0.192|        
1702|     1148.65|       0.192|
+|2021-07-01T12:00:04.000+08:00|        1701|     1145.20|       0.194|        
1701|     1145.20|       0.194|
+|2021-07-01T12:00:07.000+08:00|        1703|     1150.55|       0.195|        
1703|     1150.55|       0.195|
+|2021-07-01T12:00:08.000+08:00|        1694|     1151.55|       0.193|        
1704|     1151.55|       0.193|
+|2021-07-01T12:01:09.000+08:00|        1705|     1153.55|       0.194|        
1705|     1153.55|       0.194|
+|2021-07-01T12:01:10.000+08:00|        1706|     1152.30|       0.190|        
1706|     1152.30|       0.190|
++-----------------------------+------------+------------+------------+------------+------------+------------+
+```
+
+用于查询的 SQL 语句:
+
+```sql
+select MasterRepair(t1,t2,t3,m1,m2,m3) from root.test
+```
+
+输出序列:
+
+
+```
++-----------------------------+-------------------------------------------------------------------------------------------+
+|                         
Time|MasterRepair(root.test.t1,root.test.t2,root.test.t3,root.test.m1,root.test.m2,root.test.m3)|
++-----------------------------+-------------------------------------------------------------------------------------------+
+|2021-07-01T12:00:01.000+08:00|                                                
                                       1704|
+|2021-07-01T12:00:02.000+08:00|                                                
                                       1702|
+|2021-07-01T12:00:03.000+08:00|                                                
                                       1702|
+|2021-07-01T12:00:04.000+08:00|                                                
                                       1701|
+|2021-07-01T12:00:07.000+08:00|                                                
                                       1703|
+|2021-07-01T12:00:08.000+08:00|                                                
                                       1704|
+|2021-07-01T12:01:09.000+08:00|                                                
                                       1705|
+|2021-07-01T12:01:10.000+08:00|                                                
                                       1706|
++-----------------------------+-------------------------------------------------------------------------------------------+
 ```
\ No newline at end of file
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java
new file mode 100644
index 0000000000..07e7eae093
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/UDTFMasterRepair.java
@@ -0,0 +1,79 @@
+package org.apache.iotdb.library.drepair;
+
+import org.apache.iotdb.library.drepair.util.MasterRepairUtil;
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.ArrayList;
+
+public class UDTFMasterRepair implements UDTF {
+  private MasterRepairUtil masterRepairUtil;
+  private int output_column;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    for (int i = 0; i < validator.getParameters().getAttributes().size(); i++) 
{
+      validator.validateInputSeriesDataType(i, Type.DOUBLE, Type.FLOAT, 
Type.INT32, Type.INT64);
+    }
+    if (validator.getParameters().hasAttribute("omega")) {
+      validator.validate(
+          omega -> (int) omega >= 0,
+          "Parameter omega should be non-negative.",
+          validator.getParameters().getInt("omega"));
+    }
+    if (validator.getParameters().hasAttribute("eta")) {
+      validator.validate(
+          eta -> (double) eta > 0,
+          "Parameter eta should be larger than 0.",
+          validator.getParameters().getDouble("eta"));
+    }
+    if (validator.getParameters().hasAttribute("k")) {
+      validator.validate(
+          k -> (int) k > 0,
+          "Parameter k should be a positive integer.",
+          validator.getParameters().getInt("k"));
+    }
+    if (validator.getParameters().hasAttribute("output_column")) {
+      validator.validate(
+          output_column -> (int) output_column > 0,
+          "Parameter output_column should be a positive integer.",
+          validator.getParameters().getInt("output_column"));
+    }
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations 
configurations)
+      throws Exception {
+    configurations.setAccessStrategy(new RowByRowAccessStrategy());
+    configurations.setOutputDataType(Type.DOUBLE);
+    int columnCnt = parameters.getDataTypes().size() / 2;
+    long omega = parameters.getLongOrDefault("omega", -1);
+    double eta = parameters.getDoubleOrDefault("eta", Double.NaN);
+    int k = parameters.getIntOrDefault("k", -1);
+    masterRepairUtil = new MasterRepairUtil(columnCnt, omega, eta, k);
+    output_column = parameters.getIntOrDefault("output_column", 1);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (!masterRepairUtil.isNullRow(row)) {
+      masterRepairUtil.addRow(row);
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    masterRepairUtil.repair();
+    ArrayList<Long> times = masterRepairUtil.getTime();
+    ArrayList<Double> column = 
masterRepairUtil.getCleanResultColumn(this.output_column);
+    for (int i = 0; i < column.size(); i++) {
+      collector.putDouble(times.get(i), column.get(i));
+    }
+  }
+}
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java
new file mode 100644
index 0000000000..49f2c48da0
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/KDTreeUtil.java
@@ -0,0 +1,336 @@
+package org.apache.iotdb.library.drepair.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Stack;
+
+import static java.lang.Math.min;
+import static java.lang.Math.sqrt;
+
+public class KDTreeUtil {
+  private Node kdtree;
+
+  private static class Node {
+    // 分割的维度
+    int partitiondimension;
+    // 分割的值
+    double partitionValue;
+    // 如果为非叶子节点,该属性为空
+    // 否则为数据
+    ArrayList<Double> value;
+    // 是否为叶子
+    boolean isLeaf = false;
+    // 左树
+    Node left;
+    // 右树
+    Node right;
+    // 每个维度的最小值
+    ArrayList<Double> min;
+    // 每个维度的最大值
+    ArrayList<Double> max;
+  }
+
+  public static KDTreeUtil build(ArrayList<ArrayList<Double>> input, int 
dimension) {
+    KDTreeUtil tree = new KDTreeUtil();
+    tree.kdtree = new Node();
+    tree.buildDetail(tree.kdtree, input, dimension);
+    return tree;
+  }
+
+  private void buildDetail(Node node, ArrayList<ArrayList<Double>> data, int 
dimensions) {
+    if (data.size() == 0) {
+      return;
+    }
+    if (data.size() == 1) {
+      node.isLeaf = true;
+      node.value = data.get(0);
+      return;
+    }
+    // 选择方差最大的维度
+    node.partitiondimension = -1;
+    double var = -1;
+    double tmpvar;
+    for (int i = 0; i < dimensions; i++) {
+      tmpvar = UtilZ.variance(data, i);
+      if (tmpvar > var) {
+        var = tmpvar;
+        node.partitiondimension = i;
+      }
+    }
+    // 如果方差=0,表示所有数据都相同,判定为叶子节点
+    if (var == 0d) {
+      node.isLeaf = true;
+      node.value = data.get(0);
+      return;
+    }
+
+    // 选择分割的值
+    node.partitionValue = UtilZ.median(data, node.partitiondimension);
+
+    ArrayList<ArrayList<Double>> maxmin = UtilZ.maxmin(data, dimensions);
+    node.min = maxmin.get(0);
+    node.max = maxmin.get(1);
+
+    ArrayList<ArrayList<Double>> left = new ArrayList<>();
+    ArrayList<ArrayList<Double>> right = new ArrayList<>();
+
+    for (ArrayList<Double> d : data) {
+      if (d.get(node.partitiondimension) < node.partitionValue) {
+        left.add(d);
+      } else if (d.get(node.partitiondimension) > node.partitionValue) {
+        right.add(d);
+      }
+    }
+    for (ArrayList<Double> d : data) {
+      if (d.get(node.partitiondimension) == node.partitionValue) {
+        if (left.size() == 0) {
+          left.add(d);
+        } else {
+          right.add(d);
+        }
+      }
+    }
+
+    Node leftnode = new Node();
+    Node rightnode = new Node();
+    node.left = leftnode;
+    node.right = rightnode;
+    buildDetail(leftnode, left, dimensions);
+    buildDetail(rightnode, right, dimensions);
+  }
+
+  public ArrayList<Double> query(ArrayList<Double> input, double[] std) {
+    Node node = kdtree;
+    Stack<Node> stack = new Stack<>();
+    while (!node.isLeaf) {
+      if (input.get(node.partitiondimension) < node.partitionValue) {
+        stack.add(node.right);
+        node = node.left;
+      } else {
+        stack.push(node.left);
+        node = node.right;
+      }
+    }
+
+    double distance = UtilZ.distance(input, node.value, std);
+    ArrayList<Double> nearest = queryRec(input, distance, stack, std);
+    return nearest == null ? node.value : nearest;
+  }
+
+  public ArrayList<Double> queryRec(
+      ArrayList<Double> input, double distance, Stack<Node> stack, double[] 
std) {
+    ArrayList<Double> nearest = null;
+    Node node;
+    double tdis;
+    while (stack.size() != 0) {
+      node = stack.pop();
+      if (node.isLeaf) {
+        tdis = UtilZ.distance(input, node.value, std);
+        if (tdis < distance) {
+          distance = tdis;
+          nearest = node.value;
+        }
+      } else {
+        double mindistance = UtilZ.mindistance(input, node.max, node.min, std);
+        if (mindistance < distance) {
+          while (!node.isLeaf) {
+            if (input.get(node.partitiondimension) < node.partitionValue) {
+              stack.add(node.right);
+              node = node.left;
+            } else {
+              stack.push(node.left);
+              node = node.right;
+            }
+          }
+          tdis = UtilZ.distance(input, node.value, std);
+          if (tdis < distance) {
+            distance = tdis;
+            nearest = node.value;
+          }
+        }
+      }
+    }
+    return nearest;
+  }
+
+  public ArrayList<ArrayList<Double>> queryRecKNN(
+      ArrayList<Double> input, double distance, Stack<Node> stack, double[] 
std) {
+    ArrayList<ArrayList<Double>> nearest = new ArrayList<>();
+    Node node;
+    double tdis;
+    while (stack.size() != 0) {
+      node = stack.pop();
+      if (node.isLeaf) {
+        tdis = UtilZ.distance(input, node.value, std);
+        if (tdis < distance) {
+          distance = tdis;
+          nearest.add(node.value);
+        }
+      } else {
+        /*
+         * 得到该节点代表的超矩形中点到查找点的最小距离mindistance
+         * 如果mindistance<distance表示有可能在这个节点的子节点上找到更近的点
+         * 否则不可能找到
+         */
+        double mindistance = UtilZ.mindistance(input, node.max, node.min, std);
+        if (mindistance < distance) {
+          while (!node.isLeaf) {
+            if (input.get(node.partitiondimension) < node.partitionValue) {
+              stack.add(node.right);
+              node = node.left;
+            } else {
+              stack.push(node.left);
+              node = node.right;
+            }
+          }
+          tdis = UtilZ.distance(input, node.value, std);
+          if (tdis < distance) {
+            distance = tdis;
+            nearest.add(node.value);
+          }
+        }
+      }
+    }
+    return nearest;
+  }
+
+  public ArrayList<Double> findNearest(
+      ArrayList<Double> input, ArrayList<ArrayList<Double>> nearest, double[] 
std) {
+    double min_dis = Double.MAX_VALUE;
+    int min_index = 0;
+    for (int i = 0; i < nearest.size(); i++) {
+      double dis = UtilZ.distance(input, nearest.get(i), std);
+      if (dis < min_dis) {
+        min_dis = dis;
+        min_index = i;
+      }
+    }
+    ArrayList<Double> nt = nearest.get(min_index);
+    nearest.remove(min_index);
+    return nt;
+  }
+
+  public ArrayList<ArrayList<Double>> queryKNN(ArrayList<Double> input, int k, 
double[] std) {
+    ArrayList<ArrayList<Double>> kNearest = new ArrayList<>();
+    Node node = kdtree;
+    Stack<Node> stack = new Stack<>();
+    while (!node.isLeaf) {
+      if (input.get(node.partitiondimension) < node.partitionValue) {
+        stack.add(node.right);
+        node = node.left;
+      } else {
+        stack.push(node.left);
+        node = node.right;
+      }
+    }
+    double distance = UtilZ.distance(input, node.value, std);
+    ArrayList<ArrayList<Double>> nearest = queryRecKNN(input, distance, stack, 
std);
+    for (int i = 0; i < min(k, nearest.size()); i++) {
+      kNearest.add(findNearest(input, nearest, std));
+    }
+    if (kNearest.size() == 0) {
+      kNearest.add(node.value);
+    }
+    for (ArrayList<Double> doubles : kNearest) {
+      UtilZ.distance(doubles, input, std);
+    }
+    return kNearest;
+  }
+
+  public static class TupleWithDistance implements 
Comparable<TupleWithDistance> {
+    private final Double distance;
+    private final ArrayList<Double> tuple;
+
+    public TupleWithDistance(Double distance, ArrayList<Double> tuple) {
+      this.distance = distance;
+      this.tuple = tuple;
+    }
+
+    @Override
+    public int compareTo(TupleWithDistance t) {
+      return this.distance.compareTo(t.distance);
+    }
+
+    public Double getDistance() {
+      return distance;
+    }
+
+    public ArrayList<Double> getTuple() {
+      return tuple;
+    }
+  }
+
+  private static class UtilZ {
+
+    static double variance(ArrayList<ArrayList<Double>> data, int dimension) {
+      double sum = 0d;
+      for (ArrayList<Double> d : data) {
+        sum += d.get(dimension);
+      }
+      double avg = sum / data.size();
+      double ans = 0d;
+      for (ArrayList<Double> d : data) {
+        double temp = d.get(dimension) - avg;
+        ans += temp * temp;
+      }
+      return ans / data.size();
+    }
+
+    static double median(ArrayList<ArrayList<Double>> data, int dimension) {
+      ArrayList<Double> d = new ArrayList<>();
+      for (ArrayList<Double> k : data) {
+        d.add(k.get(dimension));
+      }
+      Collections.sort(d);
+      int pos = d.size() / 2;
+      return d.get(pos);
+    }
+
+    static ArrayList<ArrayList<Double>> maxmin(ArrayList<ArrayList<Double>> 
data, int dimensions) {
+      ArrayList<ArrayList<Double>> mm = new ArrayList<>();
+      ArrayList<Double> min_v = new ArrayList<>();
+      ArrayList<Double> max_v = new ArrayList<>();
+      // 初始化 第一行为min,第二行为max
+      for (int i = 0; i < dimensions; i++) {
+        double min_temp = Double.MAX_VALUE;
+        double max_temp = Double.MIN_VALUE;
+        for (int j = 1; j < data.size(); j++) {
+          ArrayList<Double> d = data.get(j);
+          if (d.get(i) < min_temp) {
+            min_temp = d.get(i);
+          } else if (d.get(i) > max_temp) {
+            max_temp = d.get(i);
+          }
+        }
+        min_v.add(min_temp);
+        max_v.add(max_temp);
+      }
+      mm.add(min_v);
+      mm.add(max_v);
+      return mm;
+    }
+
+    static double distance(ArrayList<Double> a, ArrayList<Double> b, double[] 
std) {
+      double sum = 0d;
+      for (int i = 0; i < a.size(); i++) {
+        if (a.get(i) != null && b.get(i) != null)
+          sum += Math.pow((a.get(i) - b.get(i)) / std[i], 2);
+      }
+      sum = sqrt(sum);
+      return sum;
+    }
+
+    static double mindistance(
+        ArrayList<Double> a, ArrayList<Double> max, ArrayList<Double> min, 
double[] std) {
+      double sum = 0d;
+      for (int i = 0; i < a.size(); i++) {
+        if (a.get(i) > max.get(i)) sum += Math.pow((a.get(i) - max.get(i)) / 
std[i], 2);
+        else if (a.get(i) < min.get(i)) {
+          sum += Math.pow((min.get(i) - a.get(i)) / std[i], 2);
+        }
+      }
+      sum = sqrt(sum);
+      return sum;
+    }
+  }
+}
diff --git 
a/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java
 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java
new file mode 100644
index 0000000000..f99a81a731
--- /dev/null
+++ 
b/library-udf/src/main/java/org/apache/iotdb/library/drepair/util/MasterRepairUtil.java
@@ -0,0 +1,260 @@
+package org.apache.iotdb.library.drepair.util;
+
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.udf.api.access.Row;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+
+public class MasterRepairUtil {
+  private final ArrayList<ArrayList<Double>> td = new ArrayList<>();
+  private final ArrayList<ArrayList<Double>> td_cleaned = new ArrayList<>();
+  private final ArrayList<ArrayList<Double>> md = new ArrayList<>();
+  private final ArrayList<Long> td_time = new ArrayList<>();
+  private final int columnCnt;
+  private long omega;
+  private Double eta;
+  private int k;
+  private double[] std;
+  private KDTreeUtil kdTreeUtil;
+
+  public MasterRepairUtil(int columnCnt, long omega, double eta, int k) throws 
Exception {
+    this.columnCnt = columnCnt;
+    this.omega = omega;
+    this.eta = eta;
+    this.k = k;
+  }
+
+  public boolean isNullRow(Row row) {
+    boolean flag = true;
+    for (int i = 0; i < row.size(); i++) {
+      if (!row.isNull(i)) {
+        flag = false;
+        break;
+      }
+    }
+    return flag;
+  }
+
+  public void addRow(Row row) throws Exception {
+    ArrayList<Double> tt = new ArrayList<>(); // time-series tuple
+    boolean containsNotNull = false;
+    for (int i = 0; i < this.columnCnt; i++) {
+      if (!row.isNull(i)) {
+        containsNotNull = true;
+        BigDecimal bd = BigDecimal.valueOf(Util.getValueAsDouble(row, i));
+        tt.add(bd.doubleValue());
+      } else {
+        tt.add(null);
+      }
+    }
+    if (containsNotNull) {
+      td.add(tt);
+      td_time.add(row.getTime());
+    }
+
+    ArrayList<Double> mt = new ArrayList<>(); // master tuple
+    containsNotNull = false;
+    for (int i = this.columnCnt; i < row.size(); i++) {
+      if (!row.isNull(i)) {
+        containsNotNull = true;
+        BigDecimal bd = BigDecimal.valueOf(Util.getValueAsDouble(row, i));
+        mt.add(bd.doubleValue());
+      } else {
+        mt.add(null);
+      }
+    }
+    if (containsNotNull) {
+      md.add(mt);
+    }
+  }
+
+  public void buildKDTree() {
+    this.kdTreeUtil = KDTreeUtil.build(md, this.columnCnt);
+  }
+
+  public ArrayList<Double> getCleanResultColumn(int columnPos) {
+    ArrayList<Double> column = new ArrayList<>();
+    for (ArrayList<Double> tuple : this.td_cleaned) {
+      column.add(tuple.get(columnPos - 1));
+    }
+    return column;
+  }
+
+  public ArrayList<Long> getTime() {
+    return td_time;
+  }
+
+  public double get_tm_distance(ArrayList<Double> t_tuple, ArrayList<Double> 
m_tuple) {
+    double distance = 0d;
+    for (int pos = 0; pos < columnCnt; pos++) {
+      double temp = t_tuple.get(pos) - m_tuple.get(pos);
+      temp = temp / std[pos];
+      distance += temp * temp;
+    }
+    distance = Math.sqrt(distance);
+    return distance;
+  }
+
+  public ArrayList<Integer> cal_W(int i) {
+    ArrayList<Integer> W_i = new ArrayList<>();
+    for (int l = i - 1; l >= 0; l--) {
+      if (this.td_time.get(i) <= this.td_time.get(l) + omega) {
+        W_i.add(l);
+      }
+    }
+    return W_i;
+  }
+
+  public ArrayList<ArrayList<Double>> cal_C(int i, ArrayList<Integer> W_i) {
+    ArrayList<ArrayList<Double>> C_i = new ArrayList<>();
+    if (W_i.size() == 0) {
+      C_i.add(this.kdTreeUtil.query(this.td.get(i), std));
+    } else {
+      C_i.addAll(this.kdTreeUtil.queryKNN(this.td.get(i), k, std));
+      for (Integer integer : W_i) {
+        C_i.addAll(this.kdTreeUtil.queryKNN(this.td_cleaned.get(integer), k, 
std));
+      }
+    }
+    return C_i;
+  }
+
+  public void master_repair() {
+    for (int i = 0; i < this.td.size(); i++) {
+      ArrayList<Double> tuple = this.td.get(i);
+      ArrayList<Integer> W_i = cal_W(i);
+      ArrayList<ArrayList<Double>> C_i = this.cal_C(i, W_i);
+      double min_dis = Double.MAX_VALUE;
+      ArrayList<Double> repair_tuple = new ArrayList<>();
+      for (ArrayList<Double> c_i : C_i) {
+        boolean smooth = true;
+        for (Integer w_i : W_i) {
+          ArrayList<Double> w_is = td_cleaned.get(w_i);
+          if (get_tm_distance(c_i, w_is) > eta) {
+            smooth = false;
+            break;
+          }
+        }
+        if (smooth) {
+          double dis = get_tm_distance(c_i, tuple);
+          if (dis < min_dis) {
+            min_dis = dis;
+            repair_tuple = c_i;
+          }
+        }
+      }
+      this.td_cleaned.add(repair_tuple);
+    }
+  }
+
+  public void set_parameters() {
+    if (omega == -1) {
+      ArrayList<Long> intervals = getIntervals();
+      Collections.sort(intervals);
+      long interval = intervals.get(intervals.size() / 2);
+      omega = interval * 10;
+    }
+    if (Double.isNaN(eta)) {
+      ArrayList<Double> distance_list = new ArrayList<>();
+      for (int i = 1; i < this.td.size(); i++) {
+        for (int l = i - 1; l >= 0; l--) {
+          if (this.td_time.get(i) <= this.td_time.get(l) + omega) {
+            distance_list.add(get_tm_distance(this.td.get(i), this.td.get(l)));
+          } else break;
+        }
+      }
+      Collections.sort(distance_list);
+      eta = distance_list.get((int) (distance_list.size() * 0.9973));
+    }
+    if (k == -1) {
+      for (int temp_k = 2; temp_k <= 5; temp_k++) {
+        ArrayList<Double> distance_list = new ArrayList<>();
+        for (ArrayList<Double> tuple : this.td) {
+          ArrayList<ArrayList<Double>> neighbors = 
this.kdTreeUtil.queryKNN(tuple, temp_k, std);
+          for (ArrayList<Double> neighbor : neighbors) {
+            distance_list.add(get_tm_distance(tuple, neighbor));
+          }
+        }
+        Collections.sort(distance_list);
+        if (distance_list.get((int) (distance_list.size() * 0.9)) > eta) {
+          k = temp_k;
+          break;
+        }
+      }
+      if (k == -1) {
+        k = 5;
+      }
+    }
+  }
+
+  private double varianceImperative(double[] value) {
+    double average = 0.0;
+    int cnt = 0;
+    for (double p : value) {
+      if (!Double.isNaN(p)) {
+        cnt += 1;
+        average += p;
+      }
+    }
+    if (cnt == 0) {
+      return 0d;
+    }
+    average /= cnt;
+
+    double variance = 0.0;
+    for (double p : value) {
+      if (!Double.isNaN(p)) {
+        variance += (p - average) * (p - average);
+      }
+    }
+    return variance / cnt;
+  }
+
+  private double[] getColumn(int pos) {
+    double[] column = new double[this.td.size()];
+    for (int i = 0; i < this.td.size(); i++) {
+      column[i] = this.td.get(i).get(pos);
+    }
+    return column;
+  }
+
+  public void call_std() {
+    this.std = new double[this.columnCnt];
+    for (int i = 0; i < this.columnCnt; i++) {
+      std[i] = Math.sqrt(varianceImperative(getColumn(i)));
+    }
+  }
+
+  public void repair() {
+    fillNullValue();
+    buildKDTree();
+    call_std();
+    set_parameters();
+    System.out.println(this.omega);
+    System.out.println(this.eta);
+    System.out.println(this.k);
+    master_repair();
+  }
+
+  public ArrayList<Long> getIntervals() {
+    ArrayList<Long> intervals = new ArrayList<>();
+    for (int i = 1; i < this.td_time.size(); i++) {
+      intervals.add(this.td_time.get(i) - this.td_time.get(i - 1));
+    }
+    return intervals;
+  }
+
+  public void fillNullValue() {
+    for (int i = 0; i < columnCnt; i++) {
+      double temp = this.td.get(0).get(i);
+      for (ArrayList<Double> arrayList : this.td) {
+        if (arrayList.get(i) == null) {
+          arrayList.set(i, temp);
+        } else {
+          temp = arrayList.get(i);
+        }
+      }
+    }
+  }
+}

Reply via email to