This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch TOP-K-DTW
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 71f656e715580402a92d24f90f07fbf7a06bb878
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jun 1 22:37:47 2023 +0800

    sliding window
---
 .../BuiltinTimeSeriesGeneratingFunction.java       |   3 +-
 .../iotdb/commons/udf/builtin/UDTFTopKDTW.java     |  45 +++++++
 .../udf/builtin/UDTFTopKDTWSlidingWindow.java      | 134 +++++++++++++++++++++
 3 files changed, 181 insertions(+), 1 deletion(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
index ece0e3104a6..8655dd78038 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java
@@ -90,7 +90,8 @@ public enum BuiltinTimeSeriesGeneratingFunction {
       "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", 
UDTFEqualSizeBucketOutlierSample.class),
   JEXL("JEXL", UDTFJexl.class),
   MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class),
-  M4("M4", UDTFM4.class);
+  M4("M4", UDTFM4.class),
+  TOP_K_DTW_SLIDING_WINDOW("TOP_K_DTW_SLIDING_WINDOW", 
UDTFTopKDTWSlidingWindow.class);
 
   private final String functionName;
   private final Class<?> functionClass;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java
new file mode 100644
index 00000000000..49705452772
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java
@@ -0,0 +1,45 @@
+package org.apache.iotdb.commons.udf.builtin;
+
+import org.apache.iotdb.udf.api.UDTF;
+import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class UDTFTopKDTW implements UDTF {
+
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(UDTFTopKDTW.class);
+
+  private static final int DEFAULT_BATCH_SIZE = 65536;
+
+  private static final String K = "k";
+  private static final String BATCH_SIZE = "batchSize";
+
+  protected int k;
+  protected int batchSize;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator.validateInputSeriesNumber(2).validateRequiredAttribute(K);
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations 
configurations)
+      throws Exception {
+    k = parameters.getInt(K);
+    if (k <= 0) {
+      throw new UDFParameterNotValidException("k must be positive");
+    }
+
+    batchSize = parameters.getIntOrDefault(BATCH_SIZE, DEFAULT_BATCH_SIZE);
+    if (batchSize <= 0) {
+      throw new UDFParameterNotValidException("batchSize must be positive");
+    }
+
+    configurations.setAccessStrategy(new 
SlidingSizeWindowAccessStrategy(batchSize));
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java
new file mode 100644
index 00000000000..a83328e3e9d
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin;
+
+import org.apache.iotdb.udf.api.access.Row;
+import org.apache.iotdb.udf.api.access.RowIterator;
+import org.apache.iotdb.udf.api.access.RowWindow;
+import org.apache.iotdb.udf.api.collector.PointCollector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.PriorityQueue;
+
+public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW {
+
+  private static class DTWPoint {
+
+    private final long time;
+    private final double value;
+
+    private DTWPoint(long time, double value) {
+      this.time = time;
+      this.value = value;
+    }
+  }
+
+  private static class DTWPath {
+
+    private final long startTime;
+    private double distance;
+    private int length;
+
+    private DTWPath(long startTime, double distance, int length) {
+      this.startTime = startTime;
+      this.distance = distance;
+      this.length = length;
+    }
+
+    private DTWPath copy() {
+      return new DTWPath(startTime, distance, length);
+    }
+  }
+
+  private DTWPoint[] pattern;
+  private DTWPath[] dtwBefore;
+  private DTWPath[] dtwCurrent;
+
+  private final PriorityQueue<DTWPath> topK =
+      new PriorityQueue<>((o1, o2) -> Double.compare(o2.distance, 
o1.distance));
+
+  @Override
+  public void transform(RowWindow rowWindow, PointCollector collector) throws 
Exception {
+    if (pattern == null) {
+      // Read pattern
+      RowIterator iterator = rowWindow.getRowIterator();
+      List<DTWPoint> patternList = new ArrayList<>();
+      while (iterator.hasNextRow()) {
+        Row row = iterator.next();
+        if (row.isNull(1)) {
+          break;
+        }
+        patternList.add(new DTWPoint(row.getTime(), row.getDouble(1)));
+      }
+      pattern = patternList.toArray(new DTWPoint[0]);
+    }
+
+    RowIterator iterator = rowWindow.getRowIterator();
+    while (iterator.hasNextRow()) {
+      Row row = iterator.next();
+      if (row.isNull(0)) {
+        continue;
+      }
+
+      double value = row.getDouble(0);
+      dtwCurrent = new DTWPath[pattern.length];
+      for (int i = 0; i < pattern.length; i++) {
+        double currentDistance = Math.abs(value - pattern[i].value);
+        if (i == 0) {
+          // Start a new DTW path
+          dtwCurrent[i] = new DTWPath(pattern[i].time, currentDistance, 1);
+          continue;
+        }
+
+        // Find the optimal DTW path from previous
+        dtwCurrent[i] = dtwCurrent[i - 1].copy();
+        if (dtwBefore != null) {
+          if (dtwBefore[i].distance < dtwCurrent[i].distance) {
+            dtwCurrent[i] = dtwBefore[i].copy();
+          }
+          if (dtwBefore[i - 1].distance < dtwCurrent[i].distance) {
+            dtwCurrent[i] = dtwBefore[i - 1].copy();
+          }
+        }
+        dtwCurrent[i].distance += currentDistance;
+        dtwCurrent[i].length++;
+      }
+      dtwBefore = Arrays.copyOf(dtwCurrent, dtwCurrent.length);
+      DTWPath currentPath = dtwCurrent[dtwCurrent.length - 1];
+      if (topK.size() < k) {
+        topK.offer(currentPath);
+      } else if (topK.peek().distance > currentPath.distance) {
+        topK.poll();
+        topK.offer(currentPath);
+      }
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    while (!topK.isEmpty()) {
+      DTWPath path = topK.poll();
+      collector.putDouble(path.startTime, path.distance);
+      collector.putInt(path.startTime, path.length);
+    }
+  }
+}

Reply via email to