This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a437cd469b7 Add SLIDE parameter support to CAPACITY table-valued 
function (#17456)
a437cd469b7 is described below

commit a437cd469b7c6da87cc11003ca78371c5fa2e539
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 9 15:50:45 2026 +0800

    Add SLIDE parameter support to CAPACITY table-valued function (#17456)
---
 .../relational/it/db/it/IoTDBWindowTVFIT.java      | 101 ++++++++++
 .../relational/tvf/CapacityTableFunction.java      |  69 ++++---
 .../relational/tvf/CapacityTableFunctionTest.java  | 204 +++++++++++++++++++++
 3 files changed, 348 insertions(+), 26 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
index 20339d4bf1f..d47800704ae 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
@@ -434,6 +434,107 @@ public class IoTDBWindowTVFIT {
         expectedHeader,
         retArray,
         DATABASE_NAME);
+
+    // CAPACITY with SLIDE=2 (same as SIZE=2, should behave identically to no 
SLIDE)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 2) ORDER BY stock_id, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=2, SLIDE=1 (overlapping windows)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "1,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "2,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "1,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+          "2,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 1) ORDER BY stock_id, window_index, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=3, SLIDE=2 (overlapping windows, different params)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "0,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "1,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+          "0,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+          "1,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 3, SLIDE => 2) ORDER BY stock_id, window_index, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=2, SLIDE=3 (gap windows, some rows discarded)
+    expectedHeader = new String[] {"window_index", "time", "stock_id", 
"price", "s1"};
+    retArray =
+        new String[] {
+          "0,2021-01-01T09:05:00.000Z,AAPL,100.0,101.0,",
+          "0,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
+          "0,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
+          "0,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 3) ORDER BY stock_id, window_index, time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with SIZE=2, SLIDE=1 + GROUP BY (verify aggregation with 
overlapping windows)
+    expectedHeader = new String[] {"stock_id", "window_index", "avg"};
+    retArray =
+        new String[] {
+          "AAPL,0,101.5,",
+          "AAPL,1,102.5,",
+          "AAPL,2,102.0,",
+          "TESL,0,201.0,",
+          "TESL,1,198.5,",
+          "TESL,2,195.0,",
+        };
+    tableResultSetEqualTest(
+        "SELECT stock_id, window_index, avg(price) as avg FROM CAPACITY(DATA 
=> bid PARTITION BY stock_id ORDER BY time, SIZE => 2, SLIDE => 1) GROUP BY 
window_index, stock_id ORDER BY stock_id, window_index",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // CAPACITY with negative SLIDE (error case)
+    tableAssertTestFail(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => -1) ORDER BY stock_id, time",
+        "Invalid scalar argument SLIDE, should be a positive value",
+        DATABASE_NAME);
+
+    // CAPACITY with SLIDE=0 (error case)
+    tableAssertTestFail(
+        "SELECT * FROM CAPACITY(DATA => bid PARTITION BY stock_id ORDER BY 
time, SIZE => 2, SLIDE => 0) ORDER BY stock_id, time",
+        "Invalid scalar argument SLIDE, should be a positive value",
+        DATABASE_NAME);
   }
 
   @Test
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
index 7ea8d751413..a037500c006 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
@@ -43,9 +43,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
 public class CapacityTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
   private static final String SIZE_PARAMETER_NAME = "SIZE";
+  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecifications() {
@@ -54,7 +57,17 @@ public class CapacityTableFunction implements TableFunction {
             .name(DATA_PARAMETER_NAME)
             .passThroughColumns()
             .build(),
-        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
+        ScalarParameterSpecification.builder()
+            .name(SIZE_PARAMETER_NAME)
+            .addChecker(POSITIVE_LONG_CHECKER)
+            .type(Type.INT64)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(SLIDE_PARAMETER_NAME)
+            .addChecker(POSITIVE_LONG_CHECKER)
+            .type(Type.INT64)
+            .defaultValue(-1L)
+            .build());
   }
 
   @Override
@@ -63,8 +76,16 @@ public class CapacityTableFunction implements TableFunction {
     if (size <= 0) {
       throw new UDFException(CommonMessages.SIZE_MUST_BE_POSITIVE);
     }
+    long slide = (long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+    // default SLIDE to SIZE when not specified (sentinel value -1)
+    if (slide == -1L) {
+      slide = size;
+    }
     MapTableFunctionHandle handle =
-        new MapTableFunctionHandle.Builder().addProperty(SIZE_PARAMETER_NAME, 
size).build();
+        new MapTableFunctionHandle.Builder()
+            .addProperty(SIZE_PARAMETER_NAME, size)
+            .addProperty(SLIDE_PARAMETER_NAME, slide)
+            .build();
     return TableFunctionAnalysis.builder()
         .properColumnSchema(
             new DescribedSchema.Builder().addField("window_index", 
Type.INT64).build())
@@ -82,12 +103,13 @@ public class CapacityTableFunction implements 
TableFunction {
   @Override
   public TableFunctionProcessorProvider getProcessorProvider(
       TableFunctionHandle tableFunctionHandle) {
-    long sz =
-        (long) ((MapTableFunctionHandle) 
tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME);
+    MapTableFunctionHandle handle = (MapTableFunctionHandle) 
tableFunctionHandle;
+    long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+    long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new CapacityDataProcessor(sz);
+        return new CapacityDataProcessor(size, slide);
       }
     };
   }
@@ -95,12 +117,12 @@ public class CapacityTableFunction implements 
TableFunction {
   private static class CapacityDataProcessor implements 
TableFunctionDataProcessor {
 
     private final long size;
-    private long currentStartIndex = 0;
+    private final long slide;
     private long curIndex = 0;
-    private long windowIndex = 0;
 
-    public CapacityDataProcessor(long size) {
+    public CapacityDataProcessor(long size, long slide) {
       this.size = size;
+      this.slide = slide;
     }
 
     @Override
@@ -108,26 +130,21 @@ public class CapacityTableFunction implements 
TableFunction {
         Record input,
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
-      if (curIndex - currentStartIndex == size) {
-        outputWindow(properColumnBuilders, passThroughIndexBuilder);
-        currentStartIndex = curIndex;
+      // For each row at curIndex, find all windows k such that:
+      //   k * slide <= curIndex < k * slide + size, and k >= 0
+      // The first valid k: max(0, ceil((curIndex - size + 1) / slide))
+      // The last valid k: floor(curIndex / slide)
+      long firstWindow = Math.max(0, (curIndex - size + slide) / slide);
+      long lastWindow = curIndex / slide;
+      for (long k = firstWindow; k <= lastWindow; k++) {
+        // Verify: k * slide <= curIndex < k * slide + size
+        long windowStart = k * slide;
+        if (windowStart <= curIndex && curIndex < windowStart + size) {
+          properColumnBuilders.get(0).writeLong(k);
+          passThroughIndexBuilder.writeLong(curIndex);
+        }
       }
       curIndex++;
     }
-
-    @Override
-    public void finish(
-        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
-      outputWindow(properColumnBuilders, passThroughIndexBuilder);
-    }
-
-    private void outputWindow(
-        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
-      for (long i = currentStartIndex; i < curIndex; i++) {
-        properColumnBuilders.get(0).writeLong(windowIndex);
-        passThroughIndexBuilder.writeLong(i);
-      }
-      windowIndex++;
-    }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java
new file mode 100644
index 00000000000..4f40e737586
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunctionTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class CapacityTableFunctionTest {
+
+  private final CapacityTableFunction function = new CapacityTableFunction();
+
+  // ======================== analyze() tests ========================
+
+  @Test
+  public void testAnalyzeWithSlideDefault() throws UDFException {
+    Map<String, Argument> args = new HashMap<>();
+    args.put("SIZE", new ScalarArgument(Type.INT64, 5L));
+    args.put("SLIDE", new ScalarArgument(Type.INT64, -1L));
+
+    TableFunctionAnalysis analysis = function.analyze(args);
+    assertNotNull(analysis);
+  }
+
+  @Test
+  public void testAnalyzeWithExplicitSlide() throws UDFException {
+    Map<String, Argument> args = new HashMap<>();
+    args.put("SIZE", new ScalarArgument(Type.INT64, 4L));
+    args.put("SLIDE", new ScalarArgument(Type.INT64, 2L));
+
+    TableFunctionAnalysis analysis = function.analyze(args);
+    assertNotNull(analysis);
+  }
+
+  @Test(expected = UDFException.class)
+  public void testAnalyzeSizeZero() throws UDFException {
+    Map<String, Argument> args = new HashMap<>();
+    args.put("SIZE", new ScalarArgument(Type.INT64, 0L));
+    args.put("SLIDE", new ScalarArgument(Type.INT64, -1L));
+
+    function.analyze(args);
+  }
+
+  @Test(expected = UDFException.class)
+  public void testAnalyzeSizeNegative() throws UDFException {
+    Map<String, Argument> args = new HashMap<>();
+    args.put("SIZE", new ScalarArgument(Type.INT64, -3L));
+    args.put("SLIDE", new ScalarArgument(Type.INT64, -1L));
+
+    function.analyze(args);
+  }
+
+  // ======================== processor tests ========================
+
+  /**
+   * Helper: builds the processor from analyze() -> getProcessorProvider() 
chain, then feeds N rows
+   * through process(). Returns captured (windowIndex, passThroughIndex) pairs.
+   */
+  private List<long[]> runProcessor(long size, long slide, int rowCount) 
throws UDFException {
+    Map<String, Argument> args = new HashMap<>();
+    args.put("SIZE", new ScalarArgument(Type.INT64, size));
+    args.put("SLIDE", new ScalarArgument(Type.INT64, slide == -1 ? -1L : 
slide));
+
+    TableFunctionAnalysis analysis = function.analyze(args);
+    TableFunctionHandle handle = analysis.getTableFunctionHandle();
+
+    TableFunctionProcessorProvider provider = 
function.getProcessorProvider(handle);
+    TableFunctionDataProcessor processor = provider.getDataProcessor();
+
+    Record mockRecord = Mockito.mock(Record.class);
+    List<long[]> results = new ArrayList<>();
+
+    for (int i = 0; i < rowCount; i++) {
+      ArgumentCaptor<Long> windowCaptor = ArgumentCaptor.forClass(Long.class);
+      ArgumentCaptor<Long> indexCaptor = ArgumentCaptor.forClass(Long.class);
+
+      ColumnBuilder properBuilder = Mockito.mock(ColumnBuilder.class);
+      ColumnBuilder passThroughBuilder = Mockito.mock(ColumnBuilder.class);
+
+      processor.process(mockRecord, Collections.singletonList(properBuilder), 
passThroughBuilder);
+
+      Mockito.verify(properBuilder, 
Mockito.atLeast(0)).writeLong(windowCaptor.capture());
+      Mockito.verify(passThroughBuilder, 
Mockito.atLeast(0)).writeLong(indexCaptor.capture());
+
+      List<Long> windows = windowCaptor.getAllValues();
+      List<Long> indices = indexCaptor.getAllValues();
+      for (int j = 0; j < windows.size(); j++) {
+        results.add(new long[] {windows.get(j), indices.get(j)});
+      }
+    }
+    return results;
+  }
+
+  @Test
+  public void testSlideEqualsSize() throws UDFException {
+    // SIZE=2, SLIDE=2 (non-overlapping), 5 rows
+    // window0: rows 0,1; window1: rows 2,3; window2: row 4
+    List<long[]> results = runProcessor(2, 2, 5);
+    long[][] expected = {{0, 0}, {0, 1}, {1, 2}, {1, 3}, {2, 4}};
+    assertResultsEqual(expected, results);
+  }
+
+  @Test
+  public void testSlideDefault() throws UDFException {
+    // SIZE=2, SLIDE=-1 (defaults to SIZE=2), 5 rows — same as above
+    List<long[]> results = runProcessor(2, -1, 5);
+    long[][] expected = {{0, 0}, {0, 1}, {1, 2}, {1, 3}, {2, 4}};
+    assertResultsEqual(expected, results);
+  }
+
+  @Test
+  public void testSlideLessThanSize() throws UDFException {
+    // SIZE=2, SLIDE=1 (overlapping), 3 rows
+    // row0: window 0
+    // row1: window 0, 1
+    // row2: window 1, 2
+    List<long[]> results = runProcessor(2, 1, 3);
+    long[][] expected = {{0, 0}, {0, 1}, {1, 1}, {1, 2}, {2, 2}};
+    assertResultsEqual(expected, results);
+  }
+
+  @Test
+  public void testSlideGreaterThanSize() throws UDFException {
+    // SIZE=2, SLIDE=3 (gap), 6 rows
+    // window0: rows 0,1; row2: gap; window1: rows 3,4; row5: gap
+    List<long[]> results = runProcessor(2, 3, 6);
+    long[][] expected = {{0, 0}, {0, 1}, {1, 3}, {1, 4}};
+    assertResultsEqual(expected, results);
+  }
+
+  @Test
+  public void testOverlappingLargeSize() throws UDFException {
+    // SIZE=3, SLIDE=2 (overlapping), 3 rows
+    // row0: window 0
+    // row1: window 0
+    // row2: window 0, 1
+    List<long[]> results = runProcessor(3, 2, 3);
+    long[][] expected = {{0, 0}, {0, 1}, {0, 2}, {1, 2}};
+    assertResultsEqual(expected, results);
+  }
+
+  @Test
+  public void testSingleRow() throws UDFException {
+    // SIZE=3, SLIDE=1, 1 row — row0 belongs to window 0 only
+    List<long[]> results = runProcessor(3, 1, 1);
+    long[][] expected = {{0, 0}};
+    assertResultsEqual(expected, results);
+  }
+
+  @Test
+  public void testGetArgumentsSpecifications() {
+    assertEquals(3, function.getArgumentsSpecifications().size());
+  }
+
+  @Test
+  public void testCreateTableFunctionHandle() {
+    assertNotNull(function.createTableFunctionHandle());
+  }
+
+  private void assertResultsEqual(long[][] expected, List<long[]> actual) {
+    assertEquals("Result count mismatch", expected.length, actual.size());
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals("Window index mismatch at position " + i, expected[i][0], 
actual.get(i)[0]);
+      assertEquals("PassThrough index mismatch at position " + i, 
expected[i][1], actual.get(i)[1]);
+    }
+  }
+}

Reply via email to