This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch TOP-K-DTW in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c9b42131097a4e835a04457eb884ad7ec29f7dc4 Author: YongzaoDan <[email protected]> AuthorDate: Thu Jun 1 22:46:26 2023 +0800 uniformity --- .../iotdb/commons/udf/builtin/UDTFTopKDTW.java | 8 ++++- .../udf/builtin/UDTFTopKDTWSlidingWindow.java | 35 ++++++++++++++-------- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java index 49705452772..9a9cb3be1f4 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTW.java @@ -6,6 +6,7 @@ import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator; import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; import org.apache.iotdb.udf.api.exception.UDFParameterNotValidException; +import org.apache.iotdb.udf.api.type.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +20,9 @@ public abstract class UDTFTopKDTW implements UDTF { private static final String K = "k"; private static final String BATCH_SIZE = "batchSize"; + protected static final int COLUMN_S = 0; + protected static final int COLUMN_P = 1; + protected int k; protected int batchSize; @@ -40,6 +44,8 @@ public abstract class UDTFTopKDTW implements UDTF { throw new UDFParameterNotValidException("batchSize must be positive"); } - configurations.setAccessStrategy(new SlidingSizeWindowAccessStrategy(batchSize)); + configurations + .setAccessStrategy(new SlidingSizeWindowAccessStrategy(batchSize)) + .setOutputDataType(Type.TEXT); } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java index a83328e3e9d..94ba1e16d3d 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/UDTFTopKDTWSlidingWindow.java @@ -45,17 +45,29 @@ public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW { private static class DTWPath { private final long startTime; + private long endTime; private double distance; - private int length; - private DTWPath(long startTime, double distance, int length) { + private DTWPath(long startTime, long endTime, double distance) { this.startTime = startTime; + this.endTime = endTime; this.distance = distance; - this.length = length; } private DTWPath copy() { - return new DTWPath(startTime, distance, length); + return new DTWPath(startTime, endTime, distance); + } + + @Override + public String toString() { + return "DTWPath{" + + "startTime=" + + startTime + + ", endTime=" + + endTime + + ", distance=" + + distance + + '}'; } } @@ -74,10 +86,10 @@ public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW { List<DTWPoint> patternList = new ArrayList<>(); while (iterator.hasNextRow()) { Row row = iterator.next(); - if (row.isNull(1)) { + if (row.isNull(COLUMN_P)) { break; } - patternList.add(new DTWPoint(row.getTime(), row.getDouble(1))); + patternList.add(new DTWPoint(row.getTime(), row.getDouble(COLUMN_P))); } pattern = patternList.toArray(new DTWPoint[0]); } @@ -85,17 +97,17 @@ public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW { RowIterator iterator = rowWindow.getRowIterator(); while (iterator.hasNextRow()) { Row row = iterator.next(); - if (row.isNull(0)) { + if (row.isNull(COLUMN_S)) { continue; } - double value = row.getDouble(0); + double value = row.getDouble(COLUMN_S); dtwCurrent = new DTWPath[pattern.length]; for (int i = 0; i < pattern.length; i++) { double currentDistance = Math.abs(value - pattern[i].value); if (i == 0) { // Start a new DTW path - dtwCurrent[i] = new DTWPath(pattern[i].time, currentDistance, 1); + dtwCurrent[i] = new DTWPath(pattern[i].time, pattern[i].time, currentDistance); continue; } @@ -109,8 +121,8 @@ public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW { dtwCurrent[i] = dtwBefore[i - 1].copy(); } } + dtwCurrent[i].endTime = pattern[i].time; dtwCurrent[i].distance += currentDistance; - dtwCurrent[i].length++; } dtwBefore = Arrays.copyOf(dtwCurrent, dtwCurrent.length); DTWPath currentPath = dtwCurrent[dtwCurrent.length - 1]; @@ -127,8 +139,7 @@ public class UDTFTopKDTWSlidingWindow extends UDTFTopKDTW { public void terminate(PointCollector collector) throws Exception { while (!topK.isEmpty()) { DTWPath path = topK.poll(); - collector.putDouble(path.startTime, path.distance); - collector.putInt(path.startTime, path.length); + collector.putString(path.startTime, path.toString()); } } }
