This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch TOP-K-DTW in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 71f656e715580402a92d24f90f07fbf7a06bb878 Author: YongzaoDan <[email protected]> AuthorDate: Thu Jun 1 22:37:47 2023 +0800 sliding window --- .../BuiltinTimeSeriesGeneratingFunction.java | 3 +- .../iotdb/commons/udf/builtin/UDTFTopKDTW.java | 45 +++++++ .../udf/builtin/UDTFTopKDTWSlidingWindow.java | 134 +++++++++++++++++++++ 3 files changed, 181 insertions(+), 1 deletion(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java index ece0e3104a6..8655dd78038 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinTimeSeriesGeneratingFunction.java @@ -90,7 +90,8 @@ public enum BuiltinTimeSeriesGeneratingFunction { "EQUAL_SIZE_BUCKET_OUTLIER_SAMPLE", UDTFEqualSizeBucketOutlierSample.class), JEXL("JEXL", UDTFJexl.class), MASTER_REPAIR("MASTER_REPAIR", UDTFMasterRepair.class), - M4("M4", UDTFM4.class); + M4("M4", UDTFM4.class), + TOP_K_DTW_SLIDING_WINDOW("TOP_K_DTW_SLIDING_WINDOW", UDTFTopKDTWSlidingWindow.class); private final String functionName; private final Class<?> functionClass; diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java new file mode 100644 index 00000000000..49705452772 --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java @@ -0,0 +1,45 @@ +package org.apache.iotdb.commons.udf.builtin; + +import org.apache.iotdb.udf.api.UDTF; +import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations; +import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; +import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; +import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; +import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class UDTFTopKDTW implements UDTF { + + protected static final Logger LOGGER = LoggerFactory.getLogger(UDTFTopKDTW.class); + + private static final int DEFAULT_BATCH_SIZE = 65536; + + private static final String K = "k"; + private static final String BATCH_SIZE = "batchSize"; + + protected int k; + protected int batchSize; + + @Override + public void validate(UDFParameterValidator validator) throws Exception { + validator.validateInputSeriesNumber(2).validateRequiredAttribute(K); + } + + @Override + public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) + throws Exception { + k = parameters.getInt(K); + if (k <= 0) { + throw new UDFParameterNotValidException("k must be positive"); + } + + batchSize = parameters.getIntOrDefault(BATCH_SIZE, DEFAULT_BATCH_SIZE); + if (batchSize <= 0) { + throw new UDFParameterNotValidException("batchSize must be positive"); + } + + configurations.setAccessStrategy(new SlidingSizeWindowAccessStrategy(batchSize)); + } +} diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java new file mode 100644 index 00000000000..a83328e3e9d --- /dev/null +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin; + +import org.apache.iotdb.udf.api.access.Row; +import org.apache.iotdb.udf.api.access.RowIterator; +import org.apache.iotdb.udf.api.access.RowWindow; +import org.apache.iotdb.udf.api.collector.PointCollector; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; + +public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW { + + private static class DTWPoint { + + private final long time; + private final double value; + + private DTWPoint(long time, double value) { + this.time = time; + this.value = value; + } + } + + private static class DTWPath { + + private final long startTime; + private double distance; + private int length; + + private DTWPath(long startTime, double distance, int length) { + this.startTime = startTime; + this.distance = distance; + this.length = length; + } + + private DTWPath copy() { + return new DTWPath(startTime, distance, length); + } + } + + private DTWPoint[] pattern; + private DTWPath[] dtwBefore; + private DTWPath[] dtwCurrent; + + private final PriorityQueue<DTWPath> topK = + new PriorityQueue<>((o1, o2) -> Double.compare(o2.distance, o1.distance)); + + @Override + public void transform(RowWindow rowWindow, PointCollector collector) throws Exception { + if (pattern == null) { + // Read pattern + RowIterator iterator = rowWindow.getRowIterator(); + List<DTWPoint> patternList = new ArrayList<>(); + while (iterator.hasNextRow()) { + Row row = iterator.next(); + if (row.isNull(1)) { + break; + } + patternList.add(new DTWPoint(row.getTime(), row.getDouble(1))); + } + pattern = patternList.toArray(new DTWPoint[0]); + } + + RowIterator iterator = rowWindow.getRowIterator(); + while (iterator.hasNextRow()) { + Row row = iterator.next(); + if (row.isNull(0)) { + continue; + } + + double value = row.getDouble(0); + dtwCurrent = new DTWPath[pattern.length]; + for (int i = 0; i < pattern.length; i++) { + double currentDistance = Math.abs(value - pattern[i].value); + if (i == 0) { + // Start a new DTW path + dtwCurrent[i] = new DTWPath(pattern[i].time, currentDistance, 1); + continue; + } + + // Find the optimal DTW path from previous + dtwCurrent[i] = dtwCurrent[i - 1].copy(); + if (dtwBefore != null) { + if (dtwBefore[i].distance < dtwCurrent[i].distance) { + dtwCurrent[i] = dtwBefore[i].copy(); + } + if (dtwBefore[i - 1].distance < dtwCurrent[i].distance) { + dtwCurrent[i] = dtwBefore[i - 1].copy(); + } + } + dtwCurrent[i].distance += currentDistance; + dtwCurrent[i].length++; + } + dtwBefore = Arrays.copyOf(dtwCurrent, dtwCurrent.length); + DTWPath currentPath = dtwCurrent[dtwCurrent.length - 1]; + if (topK.size() < k) { + topK.offer(currentPath); + } else if (topK.peek().distance > currentPath.distance) { + topK.poll(); + topK.offer(currentPath); + } + } + } + + @Override + public void terminate(PointCollector collector) throws Exception { + while (!topK.isEmpty()) { + DTWPath path = topK.poll(); + collector.putDouble(path.startTime, path.distance); + collector.putInt(path.startTime, path.length); + } + } +}
