This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch variation
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9808d6a7156deef941e4da846824526fc30607b8
Author: Chen YZ <[email protected]>
AuthorDate: Mon Jun 2 01:16:48 2025 +0800

    support ignoreNull
---
 .../relational/tvf/VariationTableFunction.java     | 63 ++++++++++++++++------
 1 file changed, 47 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
index 284f623f301..7dd15dca9e0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
@@ -41,8 +41,11 @@ import org.apache.tsfile.block.column.ColumnBuilder;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 
 import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
 
@@ -50,6 +53,7 @@ public class VariationTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
   private static final String COL_PARAMETER_NAME = "COL";
   private static final String DELTA_PARAMETER_NAME = "DELTA";
+  private static final String IGNORE_NULL_PARAMETER_NAME = "IGNORENULL";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecifications() {
@@ -61,8 +65,9 @@ public class VariationTableFunction implements TableFunction {
         
ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(),
         ScalarParameterSpecification.builder()
             .name(DELTA_PARAMETER_NAME)
-            .type(Type.DOUBLE)
-            .build());
+            .type(Type.DOUBLE).defaultValue(0.0)
+            .build(),
+        
ScalarParameterSpecification.builder().name(IGNORE_NULL_PARAMETER_NAME).type(Type.BOOLEAN).defaultValue(false).build());
   }
 
   @Override
@@ -83,6 +88,8 @@ public class VariationTableFunction implements TableFunction {
             .addProperty(
                 DELTA_PARAMETER_NAME,
                 ((ScalarArgument) 
arguments.get(DELTA_PARAMETER_NAME)).getValue())
+                .addProperty(IGNORE_NULL_PARAMETER_NAME,
+                    ((ScalarArgument) 
arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue())
             .build();
     return TableFunctionAnalysis.builder()
         .properColumnSchema(properColumnSchema)
@@ -102,10 +109,11 @@ public class VariationTableFunction implements 
TableFunction {
       TableFunctionHandle tableFunctionHandle) {
     double delta =
         (double) ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME);
+    boolean ignoreNull = (boolean) ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME);
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new VariationDataProcessor(delta);
+        return new VariationDataProcessor(delta, ignoreNull);
       }
     };
   }
@@ -113,13 +121,18 @@ public class VariationTableFunction implements 
TableFunction {
   private static class VariationDataProcessor implements 
TableFunctionDataProcessor {
 
     private final double gap;
-    private long currentStartIndex = -1;
+    private final boolean ignoreNull;
+    private final Queue<Long> skipIndex = new LinkedList<>();
+
+    private long currentStartIndex = 0;
     private double baseValue = 0;
     private long curIndex = 0;
     private long windowIndex = 0;
+    private boolean previousIsNull = true;
 
-    public VariationDataProcessor(double delta) {
+    public VariationDataProcessor(double delta, boolean ignoreNull) {
       this.gap = delta;
+      this.ignoreNull = ignoreNull;
     }
 
     @Override
@@ -127,17 +140,28 @@ public class VariationTableFunction implements 
TableFunction {
         Record input,
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
-      double value = input.getDouble(0);
-      if (currentStartIndex == -1) {
-        // init the first window
-        currentStartIndex = curIndex;
-        baseValue = value;
-      } else if (Math.abs(value - baseValue) > gap) {
-        outputWindow(properColumnBuilders, passThroughIndexBuilder);
-        currentStartIndex = curIndex;
-        // use the first value in the window as the base value
-        baseValue = value;
+      if(input.isNull(0)){
+        // handle null value
+        if(ignoreNull) {
+          // skip null values
+          skipIndex.add(curIndex);
+        }else if(!previousIsNull) {
+          // output window and reset currentStartIndex
+          outputWindow(properColumnBuilders, passThroughIndexBuilder);
+          currentStartIndex = curIndex;
+          previousIsNull = true;
+        }
+      }else{
+        double value = input.getDouble(0);
+        if (previousIsNull||Math.abs(value - baseValue) > gap) {
+          outputWindow(properColumnBuilders, passThroughIndexBuilder);
+          currentStartIndex = curIndex;
+          // use the first value in the window as the base value
+          baseValue = value;
+        }
+        previousIsNull = false;
       }
+
       curIndex++;
     }
 
@@ -149,11 +173,18 @@ public class VariationTableFunction implements 
TableFunction {
 
     private void outputWindow(
         List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      boolean increaseIndex = false;
       for (long i = currentStartIndex; i < curIndex; i++) {
+        if(!skipIndex.isEmpty()&&i==skipIndex.peek()){
+          // skip the index if it is in the skip queue
+          skipIndex.poll();
+          continue;
+        }
         properColumnBuilders.get(0).writeLong(windowIndex);
         passThroughIndexBuilder.writeLong(i);
+        increaseIndex = true;
       }
-      windowIndex++;
+      windowIndex += increaseIndex ? 1 : 0;
     }
   }
 }

Reply via email to