This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch variation
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4ec66a91281e9d482a16281930b1a87d3c4783c7
Author: Chen YZ <[email protected]>
AuthorDate: Mon Jun 2 01:46:24 2025 +0800

    done
---
 .../table/argument/ScalarArgumentChecker.java      |   8 ++
 .../java/org/apache/iotdb/udf/api/type/Type.java   |  10 ++
 .../relational/tvf/VariationTableFunction.java     | 114 +++++++++++++++------
 3 files changed, 102 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java
index 241ce5939bc..125f6539101 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/argument/ScalarArgumentChecker.java
@@ -29,4 +29,12 @@ public class ScalarArgumentChecker {
         }
         return "should be a positive value";
       };
+
+  public static Function<Object, String> NON_NEGATIVE_DOUBLE_CHECKER =
+      (value) -> {
+        if (value instanceof Double && (Double) value >= 0) {
+          return null;
+        }
+        return "should be a non-negative value";
+      };
 }
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java
index 8002d7b7ce5..a5c2852ac1b 100644
--- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java
+++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/type/Type.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.udf.api.type;
 import org.apache.tsfile.utils.Binary;
 
 import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.List;
 
 /** A substitution class for TsDataType in UDF APIs. */
 public enum Type {
@@ -98,4 +100,12 @@ public enum Type {
         return false;
     }
   }
+
+  public static List<Type> allTypes() {
+    return Arrays.asList(BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, 
TIMESTAMP, DATE, BLOB, STRING);
+  }
+
+  public static List<Type> numericTypes() {
+    return Arrays.asList(INT32, INT64, FLOAT, DOUBLE);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
index 7dd15dca9e0..44f3c8049a3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
@@ -45,9 +45,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 
 import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+import static 
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.NON_NEGATIVE_DOUBLE_CHECKER;
 
 public class VariationTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
@@ -65,9 +65,15 @@ public class VariationTableFunction implements TableFunction 
{
         
ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(),
         ScalarParameterSpecification.builder()
             .name(DELTA_PARAMETER_NAME)
-            .type(Type.DOUBLE).defaultValue(0.0)
+            .type(Type.DOUBLE)
+            .defaultValue(0.0)
+            .addChecker(NON_NEGATIVE_DOUBLE_CHECKER)
             .build(),
-        
ScalarParameterSpecification.builder().name(IGNORE_NULL_PARAMETER_NAME).type(Type.BOOLEAN).defaultValue(false).build());
+        ScalarParameterSpecification.builder()
+            .name(IGNORE_NULL_PARAMETER_NAME)
+            .type(Type.BOOLEAN)
+            .defaultValue(false)
+            .build());
   }
 
   @Override
@@ -75,21 +81,23 @@ public class VariationTableFunction implements 
TableFunction {
     TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
     String expectedFieldName =
         (String) ((ScalarArgument) 
arguments.get(COL_PARAMETER_NAME)).getValue();
+    double delta = (double) ((ScalarArgument) 
arguments.get(DELTA_PARAMETER_NAME)).getValue();
     int requiredIndex =
         findColumnIndex(
             tableArgument,
             expectedFieldName,
-            ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE));
+            delta == 0
+                ? ImmutableSet.copyOf(Type.allTypes())
+                : ImmutableSet.copyOf(Type.numericTypes()));
     DescribedSchema properColumnSchema =
         new DescribedSchema.Builder().addField("window_index", 
Type.INT64).build();
     // outputColumnSchema
     MapTableFunctionHandle handle =
         new MapTableFunctionHandle.Builder()
+            .addProperty(DELTA_PARAMETER_NAME, delta)
             .addProperty(
-                DELTA_PARAMETER_NAME,
-                ((ScalarArgument) 
arguments.get(DELTA_PARAMETER_NAME)).getValue())
-                .addProperty(IGNORE_NULL_PARAMETER_NAME,
-                    ((ScalarArgument) 
arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue())
+                IGNORE_NULL_PARAMETER_NAME,
+                ((ScalarArgument) 
arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue())
             .build();
     return TableFunctionAnalysis.builder()
         .properColumnSchema(properColumnSchema)
@@ -109,56 +117,102 @@ public class VariationTableFunction implements 
TableFunction {
       TableFunctionHandle tableFunctionHandle) {
     double delta =
         (double) ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME);
-    boolean ignoreNull = (boolean) ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME);
+    boolean ignoreNull =
+        (boolean)
+            ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME);
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new VariationDataProcessor(delta, ignoreNull);
+        return delta == 0
+            ? new EquivalentVariationDataProcessor(ignoreNull)
+            : new NumericVariationDataProcessor(delta, ignoreNull);
       }
     };
   }
 
-  private static class VariationDataProcessor implements 
TableFunctionDataProcessor {
+  private static class EquivalentVariationDataProcessor extends 
VariationDataProcessor {
+    protected Object baseValue = null;
+
+    public EquivalentVariationDataProcessor(boolean ignoreNull) {
+      super(ignoreNull);
+    }
+
+    @Override
+    void handleNonNullValue(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder) {
+      Object value = input.getObject(0);
+      if (previousIsNull || !value.equals(baseValue)) {
+        outputWindow(properColumnBuilders, passThroughIndexBuilder);
+        currentStartIndex = curIndex;
+        // use the first value in the window as the base value
+        baseValue = value;
+      }
+    }
+  }
 
+  private static class NumericVariationDataProcessor extends 
VariationDataProcessor {
     private final double gap;
+    protected double baseValue = 0;
+
+    public NumericVariationDataProcessor(double delta, boolean ignoreNull) {
+      super(ignoreNull);
+      this.gap = delta;
+    }
+
+    @Override
+    void handleNonNullValue(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder) {
+      double value = input.getDouble(0);
+      if (previousIsNull || Math.abs(value - baseValue) > gap) {
+        outputWindow(properColumnBuilders, passThroughIndexBuilder);
+        currentStartIndex = curIndex;
+        // use the first value in the window as the base value
+        baseValue = value;
+      }
+    }
+  }
+
+  private abstract static class VariationDataProcessor implements 
TableFunctionDataProcessor {
+
     private final boolean ignoreNull;
     private final Queue<Long> skipIndex = new LinkedList<>();
 
-    private long currentStartIndex = 0;
-    private double baseValue = 0;
-    private long curIndex = 0;
+    protected long currentStartIndex = 0;
+    protected long curIndex = 0;
+    protected boolean previousIsNull = true;
     private long windowIndex = 0;
-    private boolean previousIsNull = true;
 
-    public VariationDataProcessor(double delta, boolean ignoreNull) {
-      this.gap = delta;
+    public VariationDataProcessor(boolean ignoreNull) {
       this.ignoreNull = ignoreNull;
     }
 
+    abstract void handleNonNullValue(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder);
+
     @Override
     public void process(
         Record input,
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
-      if(input.isNull(0)){
+      if (input.isNull(0)) {
         // handle null value
-        if(ignoreNull) {
+        if (ignoreNull) {
           // skip null values
           skipIndex.add(curIndex);
-        }else if(!previousIsNull) {
+        } else if (!previousIsNull) {
           // output window and reset currentStartIndex
           outputWindow(properColumnBuilders, passThroughIndexBuilder);
           currentStartIndex = curIndex;
           previousIsNull = true;
         }
-      }else{
-        double value = input.getDouble(0);
-        if (previousIsNull||Math.abs(value - baseValue) > gap) {
-          outputWindow(properColumnBuilders, passThroughIndexBuilder);
-          currentStartIndex = curIndex;
-          // use the first value in the window as the base value
-          baseValue = value;
-        }
+      } else {
+        handleNonNullValue(input, properColumnBuilders, 
passThroughIndexBuilder);
         previousIsNull = false;
       }
 
@@ -171,11 +225,11 @@ public class VariationTableFunction implements 
TableFunction {
       outputWindow(properColumnBuilders, passThroughIndexBuilder);
     }
 
-    private void outputWindow(
+    protected void outputWindow(
         List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
       boolean increaseIndex = false;
       for (long i = currentStartIndex; i < curIndex; i++) {
-        if(!skipIndex.isEmpty()&&i==skipIndex.peek()){
+        if (!skipIndex.isEmpty() && i == skipIndex.peek()) {
           // skip the index if it is in the skip queue
           skipIndex.poll();
           continue;

Reply via email to