This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 983e927ce97 Add M4 table function implementation and integrate with 
statement ana… (#17656)
983e927ce97 is described below

commit 983e927ce9745a4a73ad8559848f47f07b9c757d
Author: Zhao Xinqi <[email protected]>
AuthorDate: Wed Jun 10 16:43:37 2026 +0800

    Add M4 table function implementation and integrate with statement ana… 
(#17656)
---
 .../relational/it/db/it/IoTDBWindowTVFIT.java      | 334 +++++++++
 .../relational/analyzer/StatementAnalyzer.java     | 144 +++-
 .../plan/relational/planner/RelationPlanner.java   |   6 +-
 .../relational/analyzer/TableFunctionTest.java     | 187 +++++
 .../function/TableBuiltinTableFunction.java        |   4 +
 .../builtin/relational/tvf/M4TableFunction.java    | 802 +++++++++++++++++++++
 6 files changed, 1475 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
index d47800704ae..3aa33889568 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java
@@ -62,6 +62,31 @@ public class IoTDBWindowTVFIT {
         "insert into multi_type values (2021-01-01T09:05:00, 'device1', 3, 3, 
3.0, 3.0, false, '2', X'02', 2021-01-01T10:00:00, '2021-01-02')",
         "create table t1 (value double field, value1 int32 field)",
         "insert into t1 values (1, 1, 0),(2, 2, 0),(3, 1, 0),(4, 1, 0),(5, 1, 
0),(6, 1.2, 0),(7, 1, 0),(8, 1, 0),(9, 2, 0),(10, 3, 0),(11, 4, 0),(12, 3, 
0),(13, 2, 0),(14, 3, 0),(15, 4, 0),(16, 2, 0),(17, 1, 0),(18, 1, 0),(19, 1, 
0),(20, 1, 0),(21, 1, 0),(41, 1, 0),(42, 2, 0),(43, 3, 0),(44, 4, 0),(45, 3, 
0),(46, 2, 0),(47, 2, 0),(48, 2, 0),(49, 2, 0),(50, 2, 0),(51, 2, 0),(52, 2, 
0)",
+        // M4 table function test data
+        "CREATE TABLE table1(device_id STRING TAG, s1 DOUBLE FIELD, s2 DOUBLE 
FIELD, s3 STRING FIELD)",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.001+08:00, 'device_1', 15.0, 12.0, 'OK')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.003+08:00, 'device_1', 5.0, null, 'OK')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.006+08:00, 'device_1', 30.0, null, 'WARN')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.009+08:00, 'device_1', 10.0, null, 'OK')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.020+08:00, 'device_1', 40.0, 35.0, 'CRIT')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.002+08:00, 'device_2', 8.0, 8.0, 'OK')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.005+08:00, 'device_2', 25.0, 24.0, 'WARN')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.008+08:00, 'device_2', 12.0, 13.0, 'OK')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.011+08:00, 'device_2', 18.0, 19.0, 'OK')",
+        "INSERT INTO table1(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.015+08:00, 'device_2', 6.0, 7.0, 'WARN')",
+        "CREATE TABLE table3(device_id STRING TAG, s1 DOUBLE FIELD, s2 DOUBLE 
FIELD, s3 STRING FIELD)",
+        "INSERT INTO table3(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.001+08:00, 'device_1', null, null, null)",
+        "INSERT INTO table3(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.003+08:00, 'device_1', null, null, null)",
+        "INSERT INTO table3(time, device_id, s1, s2, s3) VALUES 
(1970-01-01T08:00:00.020+08:00, 'device_1', 40.0, 35.0, 'CRIT')",
+        "CREATE TABLE table4(device_id STRING TAG, object_col BLOB FIELD)",
+        "INSERT INTO table4(time, device_id, object_col) VALUES 
(1970-01-01T08:00:00.001+08:00, 'device_1', X'0102')",
+        "CREATE TABLE table5(factory_id STRING TAG, device_id STRING TAG, s1 
DOUBLE FIELD)",
+        "INSERT INTO table5(time, factory_id, device_id, s1) VALUES 
(1970-01-01T08:00:00.001+08:00, 'F1', 'device_1', 10.0)",
+        "INSERT INTO table5(time, factory_id, device_id, s1) VALUES 
(1970-01-01T08:00:00.005+08:00, 'F1', 'device_1', 15.0)",
+        "INSERT INTO table5(time, factory_id, device_id, s1) VALUES 
(1970-01-01T08:00:00.009+08:00, 'F1', 'device_1', 5.0)",
+        "INSERT INTO table5(time, factory_id, device_id, s1) VALUES 
(1970-01-01T08:00:00.002+08:00, 'F1', 'device_2', 20.0)",
+        "INSERT INTO table5(time, factory_id, device_id, s1) VALUES 
(1970-01-01T08:00:00.011+08:00, 'F1', 'device_2', 25.0)",
+        "INSERT INTO table5(time, factory_id, device_id, s1) VALUES 
(1970-01-01T08:00:00.003+08:00, 'F2', 'device_1', 30.0)",
         "FLUSH",
         "CLEAR ATTRIBUTE CACHE",
       };
@@ -1013,4 +1038,313 @@ public class IoTDBWindowTVFIT {
         "Invalid pattern",
         DATABASE_NAME);
   }
+
+  @Test
+  public void testM4TimeWindowMode() {
+    String[] expectedHeader =
+        new String[] {
+          "window_start",
+          "window_end",
+          "device_id",
+          "s1_time",
+          "s1",
+          "s2_time",
+          "s2",
+          "s3_time",
+          "s3"
+        };
+    String[] retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.001Z,15.0,1970-01-01T00:00:00.001Z,12.0,1970-01-01T00:00:00.001Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.003Z,5.0,null,null,1970-01-01T00:00:00.006Z,WARN,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.006Z,30.0,null,null,1970-01-01T00:00:00.009Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.009Z,10.0,null,null,null,null,",
+          
"1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.030Z,device_1,1970-01-01T00:00:00.020Z,40.0,1970-01-01T00:00:00.020Z,35.0,1970-01-01T00:00:00.020Z,CRIT,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_2,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_2,1970-01-01T00:00:00.005Z,25.0,1970-01-01T00:00:00.005Z,24.0,1970-01-01T00:00:00.005Z,WARN,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_2,1970-01-01T00:00:00.008Z,12.0,1970-01-01T00:00:00.008Z,13.0,1970-01-01T00:00:00.008Z,OK,",
+          
"1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,device_2,1970-01-01T00:00:00.011Z,18.0,1970-01-01T00:00:00.011Z,19.0,1970-01-01T00:00:00.011Z,OK,",
+          
"1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,device_2,1970-01-01T00:00:00.015Z,6.0,1970-01-01T00:00:00.015Z,7.0,1970-01-01T00:00:00.015Z,WARN,"
+        };
+    tableResultSetEqualTest(
+        "SELECT window_start, window_end, device_id, s1_time, s1, s2_time, s2, 
s3_time, s3 "
+            + "FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, "
+            + "TIMECOL => 'time', SIZE => 10ms) "
+            + "ORDER BY device_id, window_start, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4TimeWindowModeSelectStar() {
+    String[] expectedHeader =
+        new String[] {
+          "window_start",
+          "window_end",
+          "device_id",
+          "s1_time",
+          "s1",
+          "s2_time",
+          "s2",
+          "s3_time",
+          "s3"
+        };
+    String[] retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.001Z,15.0,1970-01-01T00:00:00.001Z,12.0,1970-01-01T00:00:00.001Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.003Z,5.0,null,null,1970-01-01T00:00:00.006Z,WARN,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.006Z,30.0,null,null,1970-01-01T00:00:00.009Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.009Z,10.0,null,null,null,null,",
+          
"1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.030Z,device_1,1970-01-01T00:00:00.020Z,40.0,1970-01-01T00:00:00.020Z,35.0,1970-01-01T00:00:00.020Z,CRIT,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_2,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_2,1970-01-01T00:00:00.005Z,25.0,1970-01-01T00:00:00.005Z,24.0,1970-01-01T00:00:00.005Z,WARN,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_2,1970-01-01T00:00:00.008Z,12.0,1970-01-01T00:00:00.008Z,13.0,1970-01-01T00:00:00.008Z,OK,",
+          
"1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,device_2,1970-01-01T00:00:00.011Z,18.0,1970-01-01T00:00:00.011Z,19.0,1970-01-01T00:00:00.011Z,OK,",
+          
"1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,device_2,1970-01-01T00:00:00.015Z,6.0,1970-01-01T00:00:00.015Z,7.0,1970-01-01T00:00:00.015Z,WARN,"
+        };
+    tableResultSetEqualTest(
+        "SELECT * "
+            + "FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, "
+            + "TIMECOL => 'time', SIZE => 10ms) "
+            + "ORDER BY device_id, window_start, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4TimeWindowModeByPosition() {
+    String[] expectedHeader =
+        new String[] {"window_start", "window_end", "device_id", "s1_time", 
"s1"};
+    String[] retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.001Z,15.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.003Z,5.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.006Z,30.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,device_1,1970-01-01T00:00:00.009Z,10.0,"
+        };
+    tableResultSetEqualTest(
+        "SELECT window_start, window_end, device_id, s1_time, s1 "
+            + "FROM M4(table1 PARTITION BY device_id ORDER BY time, 'time', 
10ms) "
+            + "WHERE device_id = 'device_1' AND window_start = 0 "
+            + "ORDER BY s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4TimeWindowWithSlide() {
+    String[] expectedHeader =
+        new String[] {
+          "window_start",
+          "window_end",
+          "device_id",
+          "s1_time",
+          "s1",
+          "s2_time",
+          "s2",
+          "s3_time",
+          "s3"
+        };
+    String[] retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.005Z,device_1,1970-01-01T00:00:00.001Z,15.0,1970-01-01T00:00:00.001Z,12.0,1970-01-01T00:00:00.001Z,OK,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.005Z,device_1,1970-01-01T00:00:00.003Z,5.0,null,null,1970-01-01T00:00:00.003Z,OK,",
+          
"1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.025Z,device_1,1970-01-01T00:00:00.020Z,40.0,1970-01-01T00:00:00.020Z,35.0,1970-01-01T00:00:00.020Z,CRIT,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.005Z,device_2,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,OK,",
+          
"1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.015Z,device_2,1970-01-01T00:00:00.011Z,18.0,1970-01-01T00:00:00.011Z,19.0,1970-01-01T00:00:00.011Z,OK,"
+        };
+    tableResultSetEqualTest(
+        "SELECT window_start, window_end, device_id, s1_time, s1, s2_time, s2, 
s3_time, s3 "
+            + "FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, "
+            + "TIMECOL => 'time', SIZE => 5ms, SLIDE => 10ms) "
+            + "ORDER BY device_id, window_start, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4CountWindowMode() {
+    String[] expectedHeader =
+        new String[] {
+          "window_index", "device_id", "s1_time", "s1", "s2_time", "s2", 
"s3_time", "s3"
+        };
+    String[] retArray =
+        new String[] {
+          
"0,device_1,1970-01-01T00:00:00.001Z,15.0,1970-01-01T00:00:00.001Z,12.0,1970-01-01T00:00:00.001Z,OK,",
+          
"0,device_1,1970-01-01T00:00:00.003Z,5.0,null,null,1970-01-01T00:00:00.003Z,OK,",
+          
"1,device_1,1970-01-01T00:00:00.006Z,30.0,null,null,1970-01-01T00:00:00.006Z,WARN,",
+          
"1,device_1,1970-01-01T00:00:00.009Z,10.0,null,null,1970-01-01T00:00:00.009Z,OK,",
+          
"2,device_1,1970-01-01T00:00:00.020Z,40.0,1970-01-01T00:00:00.020Z,35.0,1970-01-01T00:00:00.020Z,CRIT,",
+          
"0,device_2,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,OK,",
+          
"0,device_2,1970-01-01T00:00:00.005Z,25.0,1970-01-01T00:00:00.005Z,24.0,1970-01-01T00:00:00.005Z,WARN,",
+          
"1,device_2,1970-01-01T00:00:00.008Z,12.0,1970-01-01T00:00:00.008Z,13.0,1970-01-01T00:00:00.008Z,OK,",
+          
"1,device_2,1970-01-01T00:00:00.011Z,18.0,1970-01-01T00:00:00.011Z,19.0,1970-01-01T00:00:00.011Z,OK,",
+          
"2,device_2,1970-01-01T00:00:00.015Z,6.0,1970-01-01T00:00:00.015Z,7.0,1970-01-01T00:00:00.015Z,WARN,"
+        };
+    tableResultSetEqualTest(
+        "SELECT * "
+            + "FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, "
+            + "TIMECOL => 'time', SIZE => 2) "
+            + "ORDER BY device_id, window_index, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4Origin() {
+    String[] expectedHeader =
+        new String[] {"window_start", "window_end", "device_id", "s1_time", 
"s1"};
+    String[] retArray =
+        new String[] {
+          
"1969-12-31T23:59:59.995Z,1970-01-01T00:00:00.005Z,device_1,1970-01-01T00:00:00.001Z,15.0,",
+          
"1969-12-31T23:59:59.995Z,1970-01-01T00:00:00.005Z,device_1,1970-01-01T00:00:00.003Z,5.0,",
+          
"1970-01-01T00:00:00.005Z,1970-01-01T00:00:00.015Z,device_1,1970-01-01T00:00:00.006Z,30.0,",
+          
"1970-01-01T00:00:00.005Z,1970-01-01T00:00:00.015Z,device_1,1970-01-01T00:00:00.009Z,10.0,",
+          
"1970-01-01T00:00:00.015Z,1970-01-01T00:00:00.025Z,device_1,1970-01-01T00:00:00.020Z,40.0,"
+        };
+    tableResultSetEqualTest(
+        "SELECT window_start, window_end, device_id, s1_time, s1 "
+            + "FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, "
+            + "TIMECOL => 'time', ORIGIN => 1970-01-01T08:00:00.005+08:00, 
SIZE => 10ms) "
+            + "WHERE device_id = 'device_1' ORDER BY window_start, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4MultiplePartitionColumns() {
+    String[] expectedHeader =
+        new String[] {"window_start", "window_end", "factory_id", "device_id", 
"s1_time", "s1"};
+    String[] retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,F1,device_1,1970-01-01T00:00:00.001Z,10.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,F1,device_1,1970-01-01T00:00:00.005Z,15.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,F1,device_1,1970-01-01T00:00:00.009Z,5.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,F1,device_2,1970-01-01T00:00:00.002Z,20.0,",
+          
"1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,F1,device_2,1970-01-01T00:00:00.011Z,25.0,",
+          
"1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,F2,device_1,1970-01-01T00:00:00.003Z,30.0,"
+        };
+    tableResultSetEqualTest(
+        "SELECT window_start, window_end, factory_id, device_id, s1_time, s1 "
+            + "FROM M4(DATA => table5 PARTITION BY (factory_id, device_id) 
ORDER BY time, "
+            + "TIMECOL => 'time', SIZE => 10ms) "
+            + "ORDER BY factory_id, device_id, window_start, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4AllNullWindowSkipped() {
+    String[] expectedHeader =
+        new String[] {
+          "window_start",
+          "window_end",
+          "device_id",
+          "s1_time",
+          "s1",
+          "s2_time",
+          "s2",
+          "s3_time",
+          "s3"
+        };
+    String[] retArray =
+        new String[] {
+          
"1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.030Z,device_1,1970-01-01T00:00:00.020Z,40.0,1970-01-01T00:00:00.020Z,35.0,1970-01-01T00:00:00.020Z,CRIT,"
+        };
+    tableResultSetEqualTest(
+        "SELECT window_start, window_end, device_id, s1_time, s1, s2_time, s2, 
s3_time, s3 "
+            + "FROM M4(DATA => table3 PARTITION BY device_id ORDER BY time, "
+            + "TIMECOL => 'time', SIZE => 10ms) "
+            + "ORDER BY device_id, window_start, s1_time",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4MissingSize() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, 
TIMECOL => 'time')",
+        "701: Missing required argument: SIZE",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4MissingOrderBy() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id, TIMECOL => 
'time', SIZE => 10ms)",
+        "701: Table argument with set semantics requires an ORDER BY clause.",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4UnexpectedOrderBy() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY s1, 
TIMECOL => 'time', SIZE => 10ms)",
+        "701: The ORDER BY clause of the DATA argument must contain exactly 
the time column specified by the TIMECOL argument.",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4TimeColumnNotFound() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, 
TIMECOL => 'timestamp', SIZE => 10ms)",
+        "701: Required column [timestamp] not found in the source table 
argument.",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4IllegalValueType() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table4 PARTITION BY device_id ORDER BY time, 
TIMECOL => 'time', SIZE => 10ms)",
+        "701: The type of the column [object_col] is not comparable.",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4CountWindowRejectsOrigin() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, 
TIMECOL => 'time', SIZE => 5, ORIGIN => 1970-01-01T08:00:00.000+08:00)",
+        "701: The ORIGIN argument is only supported in time window mode.",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4DescendingOrderByRejected() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time 
DESC, TIMECOL => 'time', SIZE => 10ms)",
+        "701: The ORDER BY clause of the DATA argument must sort the time 
column in ascending order.",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4RejectsNegativeSize() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, 
TIMECOL => 'time', SIZE => -1)",
+        "701: Invalid scalar argument SIZE, should be a positive value",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4RejectsFloatSize() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, 
TIMECOL => 'time', SIZE => 1.5)",
+        "701: Invalid scalar argument 'SIZE'. Expected type INT64, got Double",
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void testM4RejectsNonTimestampTimecol() {
+    tableAssertTestFail(
+        "SELECT * FROM M4(DATA => table1 PARTITION BY device_id ORDER BY time, 
TIMECOL => 's1', SIZE => 10ms)",
+        "701: The type of the column [s1] is not as expected.",
+        DATABASE_NAME);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 4ff58026a94..132a2470299 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -121,6 +121,7 @@ import 
org.apache.iotdb.commons.queryengine.utils.cte.CteDataStore;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.commons.udf.builtin.relational.tvf.M4TableFunction;
 import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
 import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -5130,7 +5131,8 @@ public class StatementAnalyzer {
         Scope argumentScope = analysis.getScope(argument.getRelation());
         if (argument.isPassThroughColumns()) {
           argumentScope.getRelationType().getAllFields().forEach(fields::add);
-        } else if (argument.getPartitionBy().isPresent()) {
+        } else if 
(!TableBuiltinTableFunction.M4.getFunctionName().equalsIgnoreCase(functionName)
+            && argument.getPartitionBy().isPresent()) {
           argument.getPartitionBy().get().stream()
               .map(expression -> validateAndGetInputField(expression, 
argumentScope))
               .forEach(fields::add);
@@ -5260,9 +5262,149 @@ public class StatementAnalyzer {
               analyzeDefault(parameterSpecification, errorLocation));
         }
       }
+      tryAppendM4ModeArgument(functionName, arguments, 
parameterSpecifications, passedArguments);
       return new ArgumentsAnalysis(passedArguments.buildOrThrow(), 
tableArgumentAnalyses.build());
     }
 
+    private void tryAppendM4ModeArgument(
+        String functionName,
+        List<TableFunctionArgument> arguments,
+        List<ParameterSpecification> parameterSpecifications,
+        ImmutableMap.Builder<String, Argument> passedArguments) {
+      if 
(!TableBuiltinTableFunction.M4.getFunctionName().equalsIgnoreCase(functionName))
 {
+        return;
+      }
+
+      TableFunctionArgument sizeArgument =
+          findTableFunctionArgument(
+              arguments, parameterSpecifications, 
M4TableFunction.SIZE_PARAMETER_NAME);
+      if (!(sizeArgument.getValue() instanceof Expression)) {
+        throw new SemanticException(
+            String.format(
+                "Invalid argument %s. Expected scalar argument",
+                M4TableFunction.SIZE_PARAMETER_NAME));
+      }
+
+      boolean isTimeWindow = sizeArgument.getValue() instanceof 
TimeDurationLiteral;
+      Optional<TableFunctionArgument> slideArgument =
+          findOptionalTableFunctionArgument(
+              arguments, parameterSpecifications, 
M4TableFunction.SLIDE_PARAMETER_NAME);
+      if (slideArgument.isPresent()) {
+        if (!(slideArgument.get().getValue() instanceof Expression)) {
+          throw new SemanticException(
+              String.format(
+                  "Invalid argument %s. Expected scalar argument",
+                  M4TableFunction.SLIDE_PARAMETER_NAME));
+        }
+        boolean isTimeSlide = slideArgument.get().getValue() instanceof 
TimeDurationLiteral;
+        if (isTimeWindow != isTimeSlide) {
+          throw new SemanticException(
+              "The SLIDE argument must have the same window mode as the SIZE 
argument.");
+        }
+      }
+      if (!isTimeWindow
+          && containsTableFunctionArgument(
+              arguments, parameterSpecifications, 
M4TableFunction.ORIGIN_PARAMETER_NAME)) {
+        throw new SemanticException("The ORIGIN argument is only supported in 
time window mode.");
+      }
+      validateM4OrderBySortOrder(arguments, parameterSpecifications);
+      passedArguments.put(
+          M4TableFunction.WINDOW_MODE_PARAMETER_NAME,
+          new ScalarArgument(org.apache.iotdb.udf.api.type.Type.BOOLEAN, 
isTimeWindow));
+    }
+
+    private void validateM4OrderBySortOrder(
+        List<TableFunctionArgument> arguments,
+        List<ParameterSpecification> parameterSpecifications) {
+      Optional<TableFunctionArgument> dataArgument =
+          findOptionalTableFunctionArgument(
+              arguments, parameterSpecifications, 
M4TableFunction.DATA_PARAMETER_NAME);
+      if (!dataArgument.isPresent()
+          || !(dataArgument.get().getValue() instanceof 
TableFunctionTableArgument)) {
+        return;
+      }
+
+      Optional<OrderBy> orderBy =
+          ((TableFunctionTableArgument) 
dataArgument.get().getValue()).getOrderBy();
+      if (!orderBy.isPresent()) {
+        return;
+      }
+
+      for (SortItem sortItem : orderBy.get().getSortItems()) {
+        if (sortItem.getOrdering() != SortItem.Ordering.ASCENDING) {
+          throw new SemanticException(
+              "The ORDER BY clause of the DATA argument must sort the time 
column in ascending order.");
+        }
+      }
+    }
+
+    private boolean containsTableFunctionArgument(
+        List<TableFunctionArgument> arguments,
+        List<ParameterSpecification> parameterSpecifications,
+        String argumentName) {
+      return findOptionalTableFunctionArgument(arguments, 
parameterSpecifications, argumentName)
+          .isPresent();
+    }
+
+    private Optional<TableFunctionArgument> findOptionalTableFunctionArgument(
+        List<TableFunctionArgument> arguments,
+        List<ParameterSpecification> parameterSpecifications,
+        String argumentName) {
+      boolean argumentsPassedByName =
+          arguments.stream().allMatch(argument -> 
argument.getName().isPresent());
+      if (argumentsPassedByName) {
+        return arguments.stream()
+            .filter(
+                argument ->
+                    
argumentName.equalsIgnoreCase(argument.getName().get().getCanonicalValue()))
+            .findFirst();
+      }
+
+      for (int i = 0, size = parameterSpecifications.size(); i < size; i++) {
+        if 
(argumentName.equalsIgnoreCase(parameterSpecifications.get(i).getName())) {
+          if (i >= arguments.size()) {
+            return Optional.empty();
+          }
+          return Optional.of(arguments.get(i));
+        }
+      }
+      return Optional.empty();
+    }
+
+    private TableFunctionArgument findTableFunctionArgument(
+        List<TableFunctionArgument> arguments,
+        List<ParameterSpecification> parameterSpecifications,
+        String argumentName) {
+      if (arguments.isEmpty()) {
+        throw new IllegalStateException("Arguments should never be empty when 
resolving M4 mode");
+      }
+
+      boolean argumentsPassedByName =
+          arguments.stream().allMatch(argument -> 
argument.getName().isPresent());
+      if (argumentsPassedByName) {
+        return arguments.stream()
+            .filter(
+                argument ->
+                    
argumentName.equalsIgnoreCase(argument.getName().get().getCanonicalValue()))
+            .findFirst()
+            .orElseThrow(
+                () ->
+                    new IllegalStateException(
+                        String.format("Missing required argument: %s", 
argumentName)));
+      }
+
+      for (int i = 0, size = parameterSpecifications.size(); i < size; i++) {
+        if 
(argumentName.equalsIgnoreCase(parameterSpecifications.get(i).getName())) {
+          if (i >= arguments.size()) {
+            throw new IllegalStateException(
+                String.format("Missing required argument: %s", argumentName));
+          }
+          return arguments.get(i);
+        }
+      }
+      throw new IllegalStateException(String.format("Unknown argument: %s", 
argumentName));
+    }
+
     // append order by time asc for built-in forecast tvf if user doesn't 
specify order by clause
     private void tryUpdateOrderByForForecastByName(
         String functionName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 268cd43452d..bf09235c5c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.queryengine.common.SessionInfo;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode;
 import org.apache.iotdb.commons.queryengine.plan.relational.analyzer.NodeRef;
+import 
org.apache.iotdb.commons.queryengine.plan.relational.function.TableBuiltinTableFunction;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName;
 import 
org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments;
@@ -1548,7 +1549,10 @@ public class RelationPlanner implements 
AstVisitor<RelationPlan, Void> {
                 symbol ->
                     new TableFunctionNode.PassThroughColumn(symbol, 
partitionBy.contains(symbol)))
             .forEach(passThroughColumns::add);
-      } else if (tableArgument.getPartitionBy().isPresent()) {
+      } else if (!TableBuiltinTableFunction.M4
+              .getFunctionName()
+              .equalsIgnoreCase(functionAnalysis.getFunctionName())
+          && tableArgument.getPartitionBy().isPresent()) {
         tableArgument.getPartitionBy().get().stream()
             // the original symbols for partitioning columns, not coerced
             .map(sourcePlanBuilder::translate)
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
index 0d680b20a27..9d09ff057b3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
@@ -481,4 +481,191 @@ public class TableFunctionTest {
       assertEquals("TIMECOL should never be null or empty.", e.getMessage());
     }
   }
+
+  @Test
+  public void testM4TimeWindowMode() {
+    PlanTester planTester = new PlanTester();
+    String sql =
+        "SELECT * FROM M4("
+            + "DATA => table1 PARTITION BY tag1 ORDER BY time, "
+            + "TIMECOL => 'time', "
+            + "SIZE => 1h)";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan =
+        tableScan(
+            "testdb.table1",
+            ImmutableMap.<String, String>builder()
+                .put("time", "time")
+                .put("tag1", "tag1")
+                .put("tag2", "tag2")
+                .put("tag3", "tag3")
+                .put("attr1", "attr1")
+                .put("attr2", "attr2")
+                .put("s1", "s1")
+                .put("s2", "s2")
+                .put("s3", "s3")
+                .buildOrThrow());
+
+    Consumer<TableFunctionProcessorMatcher.Builder> tableFunctionMatcher =
+        builder ->
+            builder
+                .name("m4")
+                .properOutputs(
+                    "window_start",
+                    "window_end",
+                    "m4_tag1",
+                    "m4_tag2_time",
+                    "m4_tag2",
+                    "m4_tag3_time",
+                    "m4_tag3",
+                    "m4_attr1_time",
+                    "m4_attr1",
+                    "m4_attr2_time",
+                    "m4_attr2",
+                    "m4_s1_time",
+                    "m4_s1",
+                    "m4_s2_time",
+                    "m4_s2",
+                    "m4_s3_time",
+                    "m4_s3")
+                .requiredSymbols("time", "tag1", "tag2", "tag3", "attr1", 
"attr2", "s1", "s2", "s3")
+                .handle(
+                    new MapTableFunctionHandle.Builder()
+                        .addProperty("SIZE", 3600000L)
+                        .addProperty("SLIDE", 3600000L)
+                        .addProperty("ORIGIN", 0L)
+                        .addProperty("__M4_WINDOW_MODE", true)
+                        .addProperty("__M4_PARTITION_TYPES", "STRING")
+                        .addProperty(
+                            "__M4_PARTICIPANT_TYPES",
+                            "STRING,STRING,STRING,STRING,INT64,INT64,DOUBLE")
+                        .build());
+
+    assertPlan(
+        logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, 
sort(tableScan))));
+  }
+
+  @Test
+  public void testM4CountWindowMode() {
+    PlanTester planTester = new PlanTester();
+    String sql =
+        "SELECT * FROM M4("
+            + "DATA => table1 PARTITION BY tag1 ORDER BY time, "
+            + "TIMECOL => 'time', "
+            + "SIZE => 5)";
+    LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+    PlanMatchPattern tableScan =
+        tableScan(
+            "testdb.table1",
+            ImmutableMap.<String, String>builder()
+                .put("time", "time")
+                .put("tag1", "tag1")
+                .put("tag2", "tag2")
+                .put("tag3", "tag3")
+                .put("attr1", "attr1")
+                .put("attr2", "attr2")
+                .put("s1", "s1")
+                .put("s2", "s2")
+                .put("s3", "s3")
+                .buildOrThrow());
+
+    Consumer<TableFunctionProcessorMatcher.Builder> tableFunctionMatcher =
+        builder ->
+            builder
+                .name("m4")
+                .properOutputs(
+                    "window_index",
+                    "m4_tag1",
+                    "m4_tag2_time",
+                    "m4_tag2",
+                    "m4_tag3_time",
+                    "m4_tag3",
+                    "m4_attr1_time",
+                    "m4_attr1",
+                    "m4_attr2_time",
+                    "m4_attr2",
+                    "m4_s1_time",
+                    "m4_s1",
+                    "m4_s2_time",
+                    "m4_s2",
+                    "m4_s3_time",
+                    "m4_s3")
+                .requiredSymbols("time", "tag1", "tag2", "tag3", "attr1", 
"attr2", "s1", "s2", "s3")
+                .handle(
+                    new MapTableFunctionHandle.Builder()
+                        .addProperty("SIZE", 5L)
+                        .addProperty("SLIDE", 5L)
+                        .addProperty("__M4_WINDOW_MODE", false)
+                        .addProperty("__M4_PARTITION_TYPES", "STRING")
+                        .addProperty(
+                            "__M4_PARTICIPANT_TYPES",
+                            "STRING,STRING,STRING,STRING,INT64,INT64,DOUBLE")
+                        .build());
+
+    assertPlan(
+        logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, 
sort(tableScan))));
+  }
+
+  @Test
+  public void testM4MissingOrderBy() {
+    String sql = "SELECT * FROM M4(DATA => table1 PARTITION BY tag1, TIMECOL 
=> 'time', SIZE => 5)";
+    try {
+      analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+      fail();
+    } catch (SemanticException e) {
+      assertEquals(
+          "Table argument with set semantics requires an ORDER BY clause.", 
e.getMessage());
+    }
+  }
+
+  @Test
+  public void testM4CountWindowRejectsOrigin() {
+    String sql =
+        "SELECT * FROM M4(DATA => table1 PARTITION BY tag1 ORDER BY time, 
TIMECOL => 'time', SIZE => 5, ORIGIN => 1970-01-01T00:00:00.000+00:00)";
+    try {
+      analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+      fail();
+    } catch (SemanticException e) {
+      assertEquals("The ORIGIN argument is only supported in time window 
mode.", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testM4RejectsMismatchedSlideMode() {
+    String sql =
+        "SELECT * FROM M4(DATA => table1 PARTITION BY tag1 ORDER BY time, 
TIMECOL => 'time', SIZE => 1h, SLIDE => 5)";
+    try {
+      analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+      fail();
+    } catch (SemanticException e) {
+      assertEquals(
+          "The SLIDE argument must have the same window mode as the SIZE 
argument.",
+          e.getMessage());
+    }
+
+    sql =
+        "SELECT * FROM M4(DATA => table1 PARTITION BY tag1 ORDER BY time, 
TIMECOL => 'time', SIZE => 5, SLIDE => 1h)";
+    try {
+      analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+      fail();
+    } catch (SemanticException e) {
+      assertEquals(
+          "The SLIDE argument must have the same window mode as the SIZE 
argument.",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testM4RejectsDescendingOrderBy() {
+    String sql =
+        "SELECT * FROM M4(DATA => table1 PARTITION BY tag1 ORDER BY time DESC, 
TIMECOL => 'time', SIZE => 1h)";
+    try {
+      analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+      fail();
+    } catch (SemanticException e) {
+      assertEquals(
+          "The ORDER BY clause of the DATA argument must sort the time column 
in ascending order.",
+          e.getMessage());
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java
index 43673a2e90c..eb969a2c6ae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.queryengine.plan.relational.function.tvf.Pattern
 import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.CapacityTableFunction;
 import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.CumulateTableFunction;
 import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction;
+import org.apache.iotdb.commons.udf.builtin.relational.tvf.M4TableFunction;
 import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction;
 import org.apache.iotdb.commons.udf.builtin.relational.tvf.TumbleTableFunction;
 import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.VariationTableFunction;
@@ -43,6 +44,7 @@ public enum TableBuiltinTableFunction {
   SESSION("session"),
   VARIATION("variation"),
   CAPACITY("capacity"),
+  M4("m4"),
   FORECAST("forecast"),
   PATTERN_MATCH("pattern_match"),
   CLASSIFY("classify");
@@ -87,6 +89,8 @@ public enum TableBuiltinTableFunction {
         return new PatternMatchTableFunction();
       case "capacity":
         return new CapacityTableFunction();
+      case "m4":
+        return new M4TableFunction();
       case "forecast":
         return new ForecastTableFunction();
       case "classify":
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java
new file mode 100644
index 00000000000..9918ef7933f
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.commons.exception.SemanticException;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.LocalDate;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+import static 
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
+public class M4TableFunction implements TableFunction {
+
+  public static final String DATA_PARAMETER_NAME = "DATA";
+  public static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+  public static final String SIZE_PARAMETER_NAME = "SIZE";
+  public static final String SLIDE_PARAMETER_NAME = "SLIDE";
+  public static final String ORIGIN_PARAMETER_NAME = "ORIGIN";
+  public static final String WINDOW_MODE_PARAMETER_NAME = "__M4_WINDOW_MODE";
+
+  private static final String OUTPUT_WINDOW_START_COLUMN = "window_start";
+  private static final String OUTPUT_WINDOW_END_COLUMN = "window_end";
+  private static final String OUTPUT_WINDOW_INDEX_COLUMN = "window_index";
+  private static final String PARTITION_TYPES_PROPERTY = 
"__M4_PARTITION_TYPES";
+  private static final String PARTICIPANT_TYPES_PROPERTY = 
"__M4_PARTICIPANT_TYPES";
+  private static final long UNSPECIFIED_SLIDE = Long.MIN_VALUE;
+  private static final long INVALID_INDEX = -1;
+  private static final Set<Type> SUPPORTED_PARTITION_TYPES =
+      new HashSet<>(
+          Arrays.asList(
+              Type.BOOLEAN,
+              Type.INT32,
+              Type.INT64,
+              Type.FLOAT,
+              Type.DOUBLE,
+              Type.TEXT,
+              Type.TIMESTAMP,
+              Type.DATE,
+              Type.BLOB,
+              Type.STRING));
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        
TableParameterSpecification.builder().name(DATA_PARAMETER_NAME).setSemantics().build(),
+        ScalarParameterSpecification.builder()
+            .name(TIMECOL_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(SIZE_PARAMETER_NAME)
+            .type(Type.INT64)
+            .addChecker(POSITIVE_LONG_CHECKER)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(SLIDE_PARAMETER_NAME)
+            .type(Type.INT64)
+            .defaultValue(UNSPECIFIED_SLIDE)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(ORIGIN_PARAMETER_NAME)
+            .type(Type.TIMESTAMP)
+            .defaultValue(0L)
+            .build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    if (tableArgument.getOrderBy().isEmpty()) {
+      throw new SemanticException("Table argument with set semantics requires 
an ORDER BY clause.");
+    }
+
+    String timeColumn =
+        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+    int timeColumnIndex =
+        findColumnIndex(tableArgument, timeColumn, 
Collections.singleton(Type.TIMESTAMP));
+    validateOrderBy(tableArgument, timeColumn);
+
+    List<Integer> partitionIndexes = getPartitionIndexes(tableArgument);
+    Set<Integer> excludedIndexes = new HashSet<>(partitionIndexes);
+    excludedIndexes.add(timeColumnIndex);
+
+    boolean isTimeWindow =
+        arguments.containsKey(WINDOW_MODE_PARAMETER_NAME)
+            && (boolean) ((ScalarArgument) 
arguments.get(WINDOW_MODE_PARAMETER_NAME)).getValue();
+
+    List<Integer> participantIndexes = new ArrayList<>();
+    List<Type> partitionTypes = new ArrayList<>();
+    List<Type> participantTypes = new ArrayList<>();
+    DescribedSchema.Builder schemaBuilder = new DescribedSchema.Builder();
+    if (isTimeWindow) {
+      schemaBuilder
+          .addField(OUTPUT_WINDOW_START_COLUMN, Type.TIMESTAMP)
+          .addField(OUTPUT_WINDOW_END_COLUMN, Type.TIMESTAMP);
+    } else {
+      schemaBuilder.addField(OUTPUT_WINDOW_INDEX_COLUMN, Type.INT64);
+    }
+    for (int partitionIndex : partitionIndexes) {
+      Type type = tableArgument.getFieldTypes().get(partitionIndex);
+      partitionTypes.add(type);
+      
schemaBuilder.addField(tableArgument.getFieldNames().get(partitionIndex).get(), 
type);
+    }
+
+    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+      if (excludedIndexes.contains(i)) {
+        continue;
+      }
+      Type type = tableArgument.getFieldTypes().get(i);
+      String columnName = tableArgument.getFieldNames().get(i).get();
+      if (!isComparableType(type)) {
+        throw new SemanticException(
+            String.format("The type of the column [%s] is not comparable.", 
columnName));
+      }
+      participantIndexes.add(i);
+      participantTypes.add(type);
+      schemaBuilder.addField(columnName + "_time", Type.TIMESTAMP);
+      schemaBuilder.addField(columnName, type);
+    }
+
+    if (participantIndexes.isEmpty()) {
+      throw new SemanticException("No comparable columns found for M4 
calculation.");
+    }
+
+    long size = (long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+    long slide = (long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+    if (slide == UNSPECIFIED_SLIDE) {
+      slide = size;
+    } else if (slide <= 0) {
+      throw new UDFException("Invalid scalar argument SLIDE, should be a 
positive value");
+    }
+
+    MapTableFunctionHandle.Builder handleBuilder =
+        new MapTableFunctionHandle.Builder()
+            .addProperty(WINDOW_MODE_PARAMETER_NAME, isTimeWindow)
+            .addProperty(SIZE_PARAMETER_NAME, size)
+            .addProperty(SLIDE_PARAMETER_NAME, slide)
+            .addProperty(PARTITION_TYPES_PROPERTY, joinTypes(partitionTypes))
+            .addProperty(PARTICIPANT_TYPES_PROPERTY, 
joinTypes(participantTypes));
+    if (isTimeWindow) {
+      handleBuilder.addProperty(
+          ORIGIN_PARAMETER_NAME,
+          ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue());
+    }
+    MapTableFunctionHandle handle = handleBuilder.build();
+
+    List<Integer> requiredColumns = new ArrayList<>();
+    requiredColumns.add(timeColumnIndex);
+    requiredColumns.addAll(partitionIndexes);
+    requiredColumns.addAll(participantIndexes);
+
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(schemaBuilder.build())
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, requiredColumns)
+        .handle(handle)
+        .build();
+  }
+
+  @Override
+  public TableFunctionHandle createTableFunctionHandle() {
+    return new MapTableFunctionHandle();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(
+      TableFunctionHandle tableFunctionHandle) {
+    MapTableFunctionHandle handle = (MapTableFunctionHandle) 
tableFunctionHandle;
+    boolean isTimeWindow = (boolean) 
handle.getProperty(WINDOW_MODE_PARAMETER_NAME);
+    long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+    long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
+    long origin = isTimeWindow ? (long) 
handle.getProperty(ORIGIN_PARAMETER_NAME) : 0L;
+    Type[] partitionTypes = parseTypes((String) 
handle.getProperty(PARTITION_TYPES_PROPERTY));
+    Type[] participantTypes = parseTypes((String) 
handle.getProperty(PARTICIPANT_TYPES_PROPERTY));
+
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        M4Column[] partitionColumns = createColumns(partitionTypes, 1);
+        M4Column[] participantColumns = createColumns(participantTypes, 
partitionTypes.length + 1);
+        return isTimeWindow
+            ? new TimeWindowM4DataProcessor(
+                size, slide, origin, partitionColumns, participantColumns)
+            : new CountWindowM4DataProcessor(size, slide, partitionColumns, 
participantColumns);
+      }
+    };
+  }
+
+  private static void validateOrderBy(TableArgument tableArgument, String 
timeColumn) {
+    if (tableArgument.getOrderBy().size() != 1
+        || !tableArgument.getOrderBy().get(0).equalsIgnoreCase(timeColumn)) {
+      throw new SemanticException(
+          "The ORDER BY clause of the DATA argument must contain exactly the 
time column specified by the TIMECOL argument.");
+    }
+  }
+
+  private static List<Integer> getPartitionIndexes(TableArgument 
tableArgument) {
+    List<Integer> indexes = new ArrayList<>();
+    for (String partitionColumn : tableArgument.getPartitionBy()) {
+      indexes.add(findColumnIndex(tableArgument, partitionColumn, 
SUPPORTED_PARTITION_TYPES));
+    }
+    return indexes;
+  }
+
+  // BLOB can be used as a partition column because M4 only needs to 
read/write it there
+  private static boolean isComparableType(Type type) {
+    return type != Type.BLOB && type != Type.OBJECT;
+  }
+
+  private static String joinTypes(List<Type> types) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < types.size(); i++) {
+      if (i > 0) {
+        builder.append(',');
+      }
+      builder.append(types.get(i).name());
+    }
+    return builder.toString();
+  }
+
+  private static Type[] parseTypes(String value) {
+    if (value.isEmpty()) {
+      return new Type[0];
+    }
+    String[] values = value.split(",");
+    Type[] types = new Type[values.length];
+    for (int i = 0; i < values.length; i++) {
+      types[i] = Type.valueOf(values[i]);
+    }
+    return types;
+  }
+
+  private static M4Column[] createColumns(Type[] types, int firstInputIndex) {
+    M4Column[] columns = new M4Column[types.length];
+    for (int i = 0; i < types.length; i++) {
+      columns[i] = new M4Column(firstInputIndex + i, 
ValueOperator.fromType(types[i]));
+    }
+    return columns;
+  }
+
+  private enum ValueOperator {
+    BOOLEAN(Type.BOOLEAN) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBoolean(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Boolean.compare((Boolean) left, (Boolean) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBoolean((Boolean) value);
+      }
+    },
+    INT32(Type.INT32) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getInt(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Integer.compare((Integer) left, (Integer) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeInt((Integer) value);
+      }
+    },
+    INT64(Type.INT64) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getLong(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Long.compare((Long) left, (Long) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeLong((Long) value);
+      }
+    },
+    FLOAT(Type.FLOAT) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getFloat(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Float.compare((Float) left, (Float) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeFloat((Float) value);
+      }
+    },
+    DOUBLE(Type.DOUBLE) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getDouble(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Double.compare((Double) left, (Double) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeDouble((Double) value);
+      }
+    },
+    TEXT(Type.TEXT) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBinary(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((Binary) left).compareTo((Binary) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBinary((Binary) value);
+      }
+    },
+    BLOB(Type.BLOB) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBinary(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((Binary) left).compareTo((Binary) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBinary((Binary) value);
+      }
+    },
+    TIMESTAMP(Type.TIMESTAMP) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getLong(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Long.compare((Long) left, (Long) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeLong((Long) value);
+      }
+    },
+    DATE(Type.DATE) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getLocalDate(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((LocalDate) left).compareTo((LocalDate) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeObject(value);
+      }
+    },
+    STRING(Type.STRING) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBinary(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((Binary) left).compareTo((Binary) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBinary((Binary) value);
+      }
+    };
+
+    private final Type type;
+
+    ValueOperator(Type type) {
+      this.type = type;
+    }
+
+    abstract Object read(Record record, int index);
+
+    abstract int compare(Object left, Object right);
+
+    abstract void write(ColumnBuilder builder, Object value);
+
+    static ValueOperator fromType(Type type) {
+      for (ValueOperator valueOperator : values()) {
+        if (valueOperator.type == type) {
+          return valueOperator;
+        }
+      }
+      throw new IllegalArgumentException("Unsupported M4 value type: " + type);
+    }
+  }
+
+  private static class M4Column {
+    private final int inputIndex;
+    private final ValueOperator valueOperator;
+
+    private M4Column(int inputIndex, ValueOperator valueOperator) {
+      this.inputIndex = inputIndex;
+      this.valueOperator = valueOperator;
+    }
+
+    private boolean isNull(Record record) {
+      return record.isNull(inputIndex);
+    }
+
+    private Object read(Record record) {
+      return valueOperator.read(record, inputIndex);
+    }
+
+    private void write(ColumnBuilder builder, Object value) {
+      valueOperator.write(builder, value);
+    }
+
+    private int compare(Object left, Object right) {
+      return valueOperator.compare(left, right);
+    }
+  }
+
+  private static class Candidate {
+    private long index = INVALID_INDEX;
+    private long time;
+    private Object value;
+
+    private void set(long index, long time, Object value) {
+      this.index = index;
+      this.time = time;
+      this.value = value;
+    }
+  }
+
+  private static class ColumnWindowState {
+    private final Candidate first = new Candidate();
+    private final Candidate last = new Candidate();
+    private final Candidate bottom = new Candidate();
+    private final Candidate top = new Candidate();
+
+    private void update(long rowIndex, long time, Object value, M4Column 
column) {
+      if (first.index == INVALID_INDEX) {
+        first.set(rowIndex, time, value);
+        last.set(rowIndex, time, value);
+        bottom.set(rowIndex, time, value);
+        top.set(rowIndex, time, value);
+        return;
+      }
+
+      last.set(rowIndex, time, value);
+      if (column.compare(value, bottom.value) < 0) {
+        bottom.set(rowIndex, time, value);
+      }
+      if (column.compare(value, top.value) > 0) {
+        top.set(rowIndex, time, value);
+      }
+    }
+
+    private boolean hasOutput() {
+      return first.index != INVALID_INDEX;
+    }
+
+    private List<Candidate> getSortedCandidates() {
+      if (!hasOutput()) {
+        return Collections.emptyList();
+      }
+      List<Candidate> candidates = new ArrayList<>(Arrays.asList(first, last, 
bottom, top));
+      Set<Long> emittedTimestamps = new HashSet<>();
+      candidates.removeIf(candidate -> !emittedTimestamps.add(candidate.time));
+      candidates.sort(Comparator.comparingLong(candidate -> candidate.time));
+      return candidates;
+    }
+  }
+
+  private abstract static class WindowState {
+    protected final ColumnWindowState[] columnStates;
+    protected final Object[] partitionValues;
+
+    private WindowState(int partitionColumnCount, int participantColumnCount) {
+      partitionValues = new Object[partitionColumnCount];
+      columnStates = new ColumnWindowState[participantColumnCount];
+      for (int i = 0; i < participantColumnCount; i++) {
+        columnStates[i] = new ColumnWindowState();
+      }
+    }
+
+    protected abstract void writeWindowColumns(List<ColumnBuilder> 
properColumnBuilders);
+
+    protected abstract int getWindowColumnCount();
+  }
+
+  private static class TimeWindowState extends WindowState {
+    private final long windowStart;
+    private final long endExclusive;
+
+    private TimeWindowState(
+        long windowStart, long endExclusive, int partitionColumnCount, int 
participantColumnCount) {
+      super(partitionColumnCount, participantColumnCount);
+      this.windowStart = windowStart;
+      this.endExclusive = endExclusive;
+    }
+
+    @Override
+    protected void writeWindowColumns(List<ColumnBuilder> 
properColumnBuilders) {
+      properColumnBuilders.get(0).writeLong(windowStart);
+      properColumnBuilders.get(1).writeLong(endExclusive);
+    }
+
+    @Override
+    protected int getWindowColumnCount() {
+      return 2;
+    }
+  }
+
+  private static class CountWindowState extends WindowState {
+    private final long endExclusive;
+    private final long windowIndex;
+
+    private CountWindowState(
+        long endExclusive, long windowIndex, int partitionColumnCount, int 
participantColumnCount) {
+      super(partitionColumnCount, participantColumnCount);
+      this.endExclusive = endExclusive;
+      this.windowIndex = windowIndex;
+    }
+
+    @Override
+    protected void writeWindowColumns(List<ColumnBuilder> 
properColumnBuilders) {
+      properColumnBuilders.get(0).writeLong(windowIndex);
+    }
+
+    @Override
+    protected int getWindowColumnCount() {
+      return 1;
+    }
+  }
+
+  private abstract static class AbstractM4DataProcessor implements 
TableFunctionDataProcessor {
+    protected final long size;
+    protected final long slide;
+    protected final M4Column[] partitionColumns;
+    protected final M4Column[] participantColumns;
+    protected long curIndex = 0;
+
+    protected AbstractM4DataProcessor(
+        long size, long slide, M4Column[] partitionColumns, M4Column[] 
participantColumns) {
+      this.size = size;
+      this.slide = slide;
+      this.partitionColumns = partitionColumns;
+      this.participantColumns = participantColumns;
+    }
+
+    @Override
+    public final void process(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder) {
+      processRecord(input, input.getLong(0), properColumnBuilders);
+      curIndex++;
+    }
+
+    protected abstract void processRecord(
+        Record input, long time, List<ColumnBuilder> properColumnBuilders);
+
+    protected final void updateWindow(WindowState windowState, Record input, 
long time) {
+      for (int i = 0; i < partitionColumns.length; i++) {
+        if (windowState.partitionValues[i] == null && 
!partitionColumns[i].isNull(input)) {
+          windowState.partitionValues[i] = partitionColumns[i].read(input);
+        }
+      }
+      for (int i = 0; i < participantColumns.length; i++) {
+        if (!participantColumns[i].isNull(input)) {
+          windowState.columnStates[i].update(
+              curIndex, time, participantColumns[i].read(input), 
participantColumns[i]);
+        }
+      }
+    }
+
+    protected final void outputWindow(
+        WindowState windowState, List<ColumnBuilder> properColumnBuilders) {
+      List<List<Candidate>> candidatesByColumn = new ArrayList<>();
+      int rowCount = 0;
+      for (ColumnWindowState columnState : windowState.columnStates) {
+        List<Candidate> candidates = columnState.getSortedCandidates();
+        candidatesByColumn.add(candidates);
+        rowCount = Math.max(rowCount, candidates.size());
+      }
+      if (rowCount == 0) {
+        return;
+      }
+
+      for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+        windowState.writeWindowColumns(properColumnBuilders);
+        int outputColumnIndex = windowState.getWindowColumnCount();
+        for (int columnIndex = 0; columnIndex < partitionColumns.length; 
columnIndex++) {
+          Object value = windowState.partitionValues[columnIndex];
+          if (value == null) {
+            properColumnBuilders.get(outputColumnIndex++).appendNull();
+          } else {
+            partitionColumns[columnIndex].write(
+                properColumnBuilders.get(outputColumnIndex++), value);
+          }
+        }
+        for (int columnIndex = 0; columnIndex < participantColumns.length; 
columnIndex++) {
+          List<Candidate> candidates = candidatesByColumn.get(columnIndex);
+          if (rowIndex < candidates.size()) {
+            Candidate candidate = candidates.get(rowIndex);
+            
properColumnBuilders.get(outputColumnIndex++).writeLong(candidate.time);
+            participantColumns[columnIndex].write(
+                properColumnBuilders.get(outputColumnIndex++), 
candidate.value);
+          } else {
+            properColumnBuilders.get(outputColumnIndex++).appendNull();
+            properColumnBuilders.get(outputColumnIndex++).appendNull();
+          }
+        }
+      }
+    }
+
+    protected final long getWindowEnd(long windowStart) {
+      return windowStart + size;
+    }
+  }
+
+  private static class TimeWindowM4DataProcessor extends 
AbstractM4DataProcessor {
+    private final Deque<TimeWindowState> activeWindows = new ArrayDeque<>();
+    private final long origin;
+    private boolean nextWindowStartInitialized = false;
+    private long nextWindowStart;
+
+    private TimeWindowM4DataProcessor(
+        long size,
+        long slide,
+        long origin,
+        M4Column[] partitionColumns,
+        M4Column[] participantColumns) {
+      super(size, slide, partitionColumns, participantColumns);
+      this.origin = origin;
+    }
+
+    @Override
+    protected void processRecord(
+        Record input, long time, List<ColumnBuilder> properColumnBuilders) {
+      while (!activeWindows.isEmpty() && 
activeWindows.peekFirst().endExclusive <= time) {
+        outputWindow(activeWindows.removeFirst(), properColumnBuilders);
+      }
+
+      long firstCandidateStart =
+          origin + Math.floorDiv(time - origin - size, slide) * slide + slide;
+      while (getWindowEnd(firstCandidateStart) <= time) {
+        firstCandidateStart += slide;
+      }
+      if (!nextWindowStartInitialized) {
+        nextWindowStart = firstCandidateStart;
+        nextWindowStartInitialized = true;
+      } else if (nextWindowStart < firstCandidateStart) {
+        nextWindowStart = firstCandidateStart;
+      }
+
+      while (nextWindowStart <= time && getWindowEnd(nextWindowStart) > time) {
+        activeWindows.addLast(
+            new TimeWindowState(
+                nextWindowStart,
+                getWindowEnd(nextWindowStart),
+                partitionColumns.length,
+                participantColumns.length));
+        nextWindowStart += slide;
+      }
+
+      for (TimeWindowState activeWindow : activeWindows) {
+        updateWindow(activeWindow, input, time);
+      }
+    }
+
+    @Override
+    public void finish(
+        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      while (!activeWindows.isEmpty()) {
+        outputWindow(activeWindows.removeFirst(), properColumnBuilders);
+      }
+    }
+  }
+
+  private static class CountWindowM4DataProcessor extends 
AbstractM4DataProcessor {
+    private final Deque<CountWindowState> activeWindows = new ArrayDeque<>();
+    private long rowCount = 0;
+    private long nextWindowStart = 0;
+    private long nextWindowIndex = 0;
+
+    private CountWindowM4DataProcessor(
+        long size, long slide, M4Column[] partitionColumns, M4Column[] 
participantColumns) {
+      super(size, slide, partitionColumns, participantColumns);
+    }
+
+    @Override
+    protected void processRecord(
+        Record input, long time, List<ColumnBuilder> properColumnBuilders) {
+      while (!activeWindows.isEmpty() && 
activeWindows.peekFirst().endExclusive <= rowCount) {
+        outputWindow(activeWindows.removeFirst(), properColumnBuilders);
+      }
+
+      while (getWindowEnd(nextWindowStart) <= rowCount) {
+        nextWindowStart += slide;
+      }
+
+      while (nextWindowStart <= rowCount) {
+        activeWindows.addLast(
+            new CountWindowState(
+                getWindowEnd(nextWindowStart),
+                nextWindowIndex++,
+                partitionColumns.length,
+                participantColumns.length));
+        nextWindowStart += slide;
+      }
+
+      for (CountWindowState activeWindow : activeWindows) {
+        updateWindow(activeWindow, input, time);
+      }
+      rowCount++;
+    }
+
+    @Override
+    public void finish(
+        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      while (!activeWindows.isEmpty()) {
+        outputWindow(activeWindows.removeFirst(), properColumnBuilders);
+      }
+    }
+  }
+}

Reply via email to