This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch variation in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4ec66a91281e9d482a16281930b1a87d3c4783c7 Author: Chen YZ <[email protected]> AuthorDate: Mon Jun 2 01:46:24 2025 +0800 done --- .../table/argument/ScalarArgumentChecker.java | 8 ++ .../java/org/apache/iotdb/udf/api/type/Type.java | 10 ++ .../relational/tvf/VariationTableFunction.java | 114 +++++++++++++++------ 3 files changed, 102 insertions(+), 30 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java index 241ce5939bc..125f6539101 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java @@ -29,4 +29,12 @@ public class ScalarArgumentChecker { } return "should be a positive value"; }; + + public static Function<Object, String> NON_NEGATIVE_DOUBLE_CHECKER = + (value) -> { + if (value instanceof Double && (Double) value >= 0) { + return null; + } + return "should be a non-negative value"; + }; } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java index 8002d7b7ce5..a5c2852ac1b 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java @@ -22,6 +22,8 @@ package org.apache.iotdb.udf.api.type; import org.apache.tsfile.utils.Binary; import java.time.LocalDate; +import java.util.Arrays; +import java.util.List; /** A substitution class for TsDataType in UDF APIs. */ public enum Type { @@ -98,4 +100,12 @@ public enum Type { return false; } } + + public static List<Type> allTypes() { + return Arrays.asList(BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, TIMESTAMP, DATE, BLOB, STRING); + } + + public static List<Type> numericTypes() { + return Arrays.asList(INT32, INT64, FLOAT, DOUBLE); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index 7dd15dca9e0..44f3c8049a3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -45,9 +45,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Set; import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; +import static org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.NON_NEGATIVE_DOUBLE_CHECKER; public class VariationTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; @@ -65,9 +65,15 @@ public class VariationTableFunction implements TableFunction { ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(), ScalarParameterSpecification.builder() .name(DELTA_PARAMETER_NAME) - .type(Type.DOUBLE).defaultValue(0.0) + .type(Type.DOUBLE) + .defaultValue(0.0) + .addChecker(NON_NEGATIVE_DOUBLE_CHECKER) .build(), - ScalarParameterSpecification.builder().name(IGNORE_NULL_PARAMETER_NAME).type(Type.BOOLEAN).defaultValue(false).build()); + ScalarParameterSpecification.builder() + .name(IGNORE_NULL_PARAMETER_NAME) + .type(Type.BOOLEAN) + .defaultValue(false) + .build()); } @Override @@ -75,21 +81,23 @@ public class VariationTableFunction implements TableFunction { TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); String expectedFieldName = (String) ((ScalarArgument) arguments.get(COL_PARAMETER_NAME)).getValue(); + double delta = (double) ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue(); int requiredIndex = findColumnIndex( tableArgument, expectedFieldName, - ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)); + delta == 0 + ? ImmutableSet.copyOf(Type.allTypes()) + : ImmutableSet.copyOf(Type.numericTypes())); DescribedSchema properColumnSchema = new DescribedSchema.Builder().addField("window_index", Type.INT64).build(); // outputColumnSchema MapTableFunctionHandle handle = new MapTableFunctionHandle.Builder() + .addProperty(DELTA_PARAMETER_NAME, delta) .addProperty( - DELTA_PARAMETER_NAME, - ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()) - .addProperty(IGNORE_NULL_PARAMETER_NAME, - ((ScalarArgument) arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue()) + IGNORE_NULL_PARAMETER_NAME, + ((ScalarArgument) arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue()) .build(); return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) @@ -109,56 +117,102 @@ public class VariationTableFunction implements TableFunction { TableFunctionHandle tableFunctionHandle) { double delta = (double) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME); - boolean ignoreNull = (boolean) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME); + boolean ignoreNull = + (boolean) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new VariationDataProcessor(delta, ignoreNull); + return delta == 0 + ? new EquivalentVariationDataProcessor(ignoreNull) + : new NumericVariationDataProcessor(delta, ignoreNull); } }; } - private static class VariationDataProcessor implements TableFunctionDataProcessor { + private static class EquivalentVariationDataProcessor extends VariationDataProcessor { + protected Object baseValue = null; + + public EquivalentVariationDataProcessor(boolean ignoreNull) { + super(ignoreNull); + } + + @Override + void handleNonNullValue( + Record input, + List<ColumnBuilder> properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + Object value = input.getObject(0); + if (previousIsNull || !value.equals(baseValue)) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + // use the first value in the window as the base value + baseValue = value; + } + } + } + private static class NumericVariationDataProcessor extends VariationDataProcessor { private final double gap; + protected double baseValue = 0; + + public NumericVariationDataProcessor(double delta, boolean ignoreNull) { + super(ignoreNull); + this.gap = delta; + } + + @Override + void handleNonNullValue( + Record input, + List<ColumnBuilder> properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + double value = input.getDouble(0); + if (previousIsNull || Math.abs(value - baseValue) > gap) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + // use the first value in the window as the base value + baseValue = value; + } + } + } + + private abstract static class VariationDataProcessor implements TableFunctionDataProcessor { + private final boolean ignoreNull; private final Queue<Long> skipIndex = new LinkedList<>(); - private long currentStartIndex = 0; - private double baseValue = 0; - private long curIndex = 0; + protected long currentStartIndex = 0; + protected long curIndex = 0; + protected boolean previousIsNull = true; private long windowIndex = 0; - private boolean previousIsNull = true; - public VariationDataProcessor(double delta, boolean ignoreNull) { - this.gap = delta; + public VariationDataProcessor(boolean ignoreNull) { this.ignoreNull = ignoreNull; } + abstract void handleNonNullValue( + Record input, + List<ColumnBuilder> properColumnBuilders, + ColumnBuilder passThroughIndexBuilder); + @Override public void process( Record input, List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - if(input.isNull(0)){ + if (input.isNull(0)) { // handle null value - if(ignoreNull) { + if (ignoreNull) { // skip null values skipIndex.add(curIndex); - }else if(!previousIsNull) { + } else if (!previousIsNull) { // output window and reset currentStartIndex outputWindow(properColumnBuilders, passThroughIndexBuilder); currentStartIndex = curIndex; previousIsNull = true; } - }else{ - double value = input.getDouble(0); - if (previousIsNull||Math.abs(value - baseValue) > gap) { - outputWindow(properColumnBuilders, passThroughIndexBuilder); - currentStartIndex = curIndex; - // use the first value in the window as the base value - baseValue = value; - } + } else { + handleNonNullValue(input, properColumnBuilders, passThroughIndexBuilder); previousIsNull = false; } @@ -171,11 +225,11 @@ public class VariationTableFunction implements TableFunction { outputWindow(properColumnBuilders, passThroughIndexBuilder); } - private void outputWindow( + protected void outputWindow( List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { boolean increaseIndex = false; for (long i = currentStartIndex; i < curIndex; i++) { - if(!skipIndex.isEmpty()&&i==skipIndex.peek()){ + if (!skipIndex.isEmpty() && i == skipIndex.peek()) { // skip the index if it is in the skip queue skipIndex.poll(); continue;
