This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new db0b6fd14f [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF 
(#6928)
db0b6fd14f is described below

commit db0b6fd14f6b154c0562719e44d7fefa3b3b2607
Author: AACEPT <[email protected]>
AuthorDate: Mon Aug 15 20:50:27 2022 +0800

    [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF (#6928)
---
 .../db/query/udf/example/ExampleUDFConstant.java   |   2 +
 .../iotdb/db/query/udf/example/WindowStartEnd.java |  16 +-
 .../iotdb/itbase/constant/UDFTestConstant.java     |   2 +
 .../db/it/udf/IoTDBUDFSessionWindowQueryIT.java    | 283 +++++++++++++++++++++
 .../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java     |   7 +-
 .../plan/expression/binary/BinaryExpression.java   |   1 +
 .../plan/expression/leaf/TimeSeriesOperand.java    |   1 +
 .../visitor/IntermediateLayerVisitor.java          |   1 +
 .../intermediate/ConstantIntermediateLayer.java    |   8 +
 .../dag/intermediate/IntermediateLayer.java        |   8 +
 .../MultiInputColumnIntermediateLayer.java         | 119 +++++++++
 ...InputColumnMultiReferenceIntermediateLayer.java | 117 +++++++++
 ...nputColumnSingleReferenceIntermediateLayer.java | 117 +++++++++
 .../api/customizer/strategy/AccessStrategy.java    |   5 +-
 .../strategy/SessionTimeWindowAccessStrategy.java  |  95 +++++++
 15 files changed, 778 insertions(+), 4 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
index 87984d2db8..3dfb8303a0 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
@@ -23,9 +23,11 @@ public class ExampleUDFConstant {
   public static final String ACCESS_STRATEGY_ROW_BY_ROW = "row-by-row";
   public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
   public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
+  public static final String ACCESS_STRATEGY_SESSION = "session";
   public static final String WINDOW_SIZE_KEY = "windowSize";
   public static final String TIME_INTERVAL_KEY = "timeInterval";
   public static final String SLIDING_STEP_KEY = "slidingStep";
+  public static final String SESSION_GAP_KEY = "sessionGap";
   public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
   public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
index fb1f66ec4b..bd3a369365 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.collector.PointCollector;
 import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
 import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.type.Type;
@@ -44,7 +45,8 @@ public class WindowStartEnd implements UDTF {
                   parameters.getInt(ExampleUDFConstant.SLIDING_STEP_KEY))
               : new SlidingSizeWindowAccessStrategy(
                   parameters.getInt(ExampleUDFConstant.WINDOW_SIZE_KEY)));
-    } else {
+    } else if (ExampleUDFConstant.ACCESS_STRATEGY_SLIDING_TIME.equals(
+        parameters.getString(ExampleUDFConstant.ACCESS_STRATEGY_KEY))) {
       configurations.setAccessStrategy(
           parameters.hasAttribute(ExampleUDFConstant.SLIDING_STEP_KEY)
                   && 
parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY)
@@ -56,6 +58,18 @@ public class WindowStartEnd implements UDTF {
                   
parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY))
               : new SlidingTimeWindowAccessStrategy(
                   parameters.getLong(ExampleUDFConstant.TIME_INTERVAL_KEY)));
+    } else if (ExampleUDFConstant.ACCESS_STRATEGY_SESSION.equals(
+        parameters.getString(ExampleUDFConstant.ACCESS_STRATEGY_KEY))) {
+      configurations.setAccessStrategy(
+          parameters.hasAttribute(ExampleUDFConstant.SESSION_GAP_KEY)
+                  && 
parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY)
+                  && 
parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY)
+              ? new SessionTimeWindowAccessStrategy(
+                  
parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY),
+                  
parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY),
+                  parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY))
+              : new SessionTimeWindowAccessStrategy(
+                  parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY)));
     }
   }
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
index 6613eed077..31dfeeb661 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
@@ -23,9 +23,11 @@ public class UDFTestConstant {
   public static final String ACCESS_STRATEGY_ROW_BY_ROW = "row-by-row";
   public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
   public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
+  public static final String ACCESS_STRATEGY_SESSION = "session";
   public static final String WINDOW_SIZE_KEY = "windowSize";
   public static final String TIME_INTERVAL_KEY = "timeInterval";
   public static final String SLIDING_STEP_KEY = "slidingStep";
+  public static final String SESSION_GAP_KEY = "sessionGap";
   public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
   public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
new file mode 100644
index 0000000000..5698d7d4bc
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+  protected static final int ITERATION_TIMES = 1000;
+
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    enableSeqSpaceCompaction = 
ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction = 
ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction = 
ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+    ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+    ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(100)
+        .setUdfTransformerMemoryBudgetInMB(100)
+        .setUdfReaderMemoryBudgetInMB(100);
+  }
+
+  private static void createTimeSeries() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.vehicle");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with 
datatype=INT32,encoding=PLAIN");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i 
== 54 || i == 996
+            || i == 997 || i == 998) {
+          continue;
+        }
+        statement.execute(
+            (String.format("insert into root.vehicle.d1(timestamp,s3) 
values(%d,%d)", i, i)));
+      }
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function window_start_end as 
'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private void testSessionTimeWindowSS(
+      String sessionGap, long[] windowStart, long[] windowEnd, Long 
displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from 
root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', 
'%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin,
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      int cnt = 0;
+      while (resultSet.next()) {
+        Assert.assertEquals(resultSet.getLong(1), windowStart[cnt]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[cnt]);
+        cnt++;
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSessionTimeWindowSS1() {
+    String sessionGap = "2";
+    long[] windowStart = new long[] {0, 8, 55, 999};
+    long[] windowEnd = new long[] {4, 50, 995, 999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS2() {
+    String sessionGap = "5";
+    long[] windowStart = new long[] {0, 55};
+    long[] windowEnd = new long[] {50, 999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS3() {
+    String sessionGap = "6";
+    long[] windowStart = new long[] {0};
+    long[] windowEnd = new long[] {999};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS4() {
+    String sessionGap = "2";
+    Long displayBegin = 1L;
+    Long displayEnd = 993L;
+    long[] windowStart = new long[] {1, 8, 55};
+    long[] windowEnd = new long[] {4, 50, 992};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, 
displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS5() {
+    String sessionGap = "5";
+    Long displayBegin = 43L;
+    Long displayEnd = 100L;
+    long[] windowStart = new long[] {43, 55};
+    long[] windowEnd = new long[] {50, 99};
+    testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin, 
displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS6() {
+    String sessionGap = "1";
+    Long displayBegin = 2L;
+    Long displayEnd = 20000L;
+    ArrayList<Long> windowStart = new ArrayList<>();
+    ArrayList<Long> windowEnd = new ArrayList<>();
+    for (long i = displayBegin; i < ITERATION_TIMES; i++) {
+      if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i == 
54 || i == 996
+          || i == 997 || i == 998) {
+        continue;
+      }
+      windowStart.add(i);
+      windowEnd.add(i);
+    }
+    testSessionTimeWindowSS(
+        sessionGap,
+        windowStart.stream().mapToLong(t -> t).toArray(),
+        windowEnd.stream().mapToLong(t -> t).toArray(),
+        displayBegin,
+        displayEnd);
+  }
+
+  private void testSessionTimeWindowSSOutOfRange(
+      String sessionGap, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s') from 
root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s', 
'%s'='%s') from root.vehicle.d1",
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_SESSION,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin,
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd,
+              UDFTestConstant.SESSION_GAP_KEY,
+              sessionGap);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      int count = 0;
+      while (resultSet.next()) {
+        count++;
+      }
+      assertEquals(count, 0);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testSessionTimeWindowSS7() {
+    String sessionGap = "2";
+    Long displayBegin = 1000000L;
+    Long displayEnd = 2000000L;
+    testSessionTimeWindowSSOutOfRange(sessionGap, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testSessionTimeWindowSS8() {
+    String sessionGap = "2";
+    Long displayBegin = -2000000L;
+    Long displayEnd = -100L;
+    testSessionTimeWindowSSOutOfRange(sessionGap, displayBegin, displayEnd);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
index 4285c87bfd..c654f71c72 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
@@ -499,8 +499,11 @@ public class IoTDBUDFWindowQueryIT {
 
     sql =
         String.format(
-            "select window_start_end(s1, '%s'='%s') from root.vehicle.d1",
-            UDFTestConstant.TIME_INTERVAL_KEY, timeInterval);
+            "select window_start_end(s1, '%s'='%s', '%s'='%s') from 
root.vehicle.d1",
+            UDFTestConstant.ACCESS_STRATEGY_KEY,
+            UDFTestConstant.ACCESS_STRATEGY_SLIDING_TIME,
+            UDFTestConstant.TIME_INTERVAL_KEY,
+            timeInterval);
 
     try (Connection conn = EnvFactory.getEnv().getConnection();
         Statement statement = conn.createStatement();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
index 0ee8b1ab84..5a0732dbb3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
@@ -203,6 +203,7 @@ public abstract class BinaryExpression extends Expression {
     rightExpression.bindInputLayerColumnIndexWithExpression(inputLocations);
 
     final String digest = toString();
+
     if (inputLocations.containsKey(digest)) {
       inputColumnIndex = 
inputLocations.get(digest).get(0).getValueColumnIndex();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
index 264bb5381b..7195dc992e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
@@ -104,6 +104,7 @@ public class TimeSeriesOperand extends LeafOperand {
   public void bindInputLayerColumnIndexWithExpression(
       Map<String, List<InputLocation>> inputLocations) {
     final String digest = toString();
+
     if (inputLocations.containsKey(digest)) {
       inputColumnIndex = 
inputLocations.get(digest).get(0).getValueColumnIndex();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
index 75095e51d0..f7d9a4e6c5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
@@ -396,6 +396,7 @@ public class IntermediateLayerVisitor
         return new 
UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(), 
executor);
       case SLIDING_SIZE_WINDOW:
       case SLIDING_TIME_WINDOW:
+      case SESSION_TIME_WINDOW:
         return new UDFQueryRowWindowTransformer(
             udfInputIntermediateLayer.constructRowWindowReader(
                 accessStrategy, context.memoryAssigner.assign()),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
index 41a19905bd..440c2e6fbd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
 import org.apache.iotdb.db.mpp.transformation.api.LayerRowReader;
 import org.apache.iotdb.db.mpp.transformation.api.LayerRowWindowReader;
 import org.apache.iotdb.db.mpp.transformation.dag.input.ConstantInputReader;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 
@@ -63,4 +64,11 @@ public class ConstantIntermediateLayer extends 
IntermediateLayer {
     // Not allowed since the timestamp of a constant row is not defined.
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    // Not allowed since the timestamp of a constant row is not defined.
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
index d90955120a..0a8cd2e6b4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
 import org.apache.iotdb.db.mpp.transformation.api.LayerRowReader;
 import org.apache.iotdb.db.mpp.transformation.api.LayerRowWindowReader;
 import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 
@@ -59,6 +60,9 @@ public abstract class IntermediateLayer {
       case SLIDING_SIZE_WINDOW:
         return constructRowSlidingSizeWindowReader(
             (SlidingSizeWindowAccessStrategy) strategy, memoryBudgetInMB);
+      case SESSION_TIME_WINDOW:
+        return constructRowSessionTimeWindowReader(
+            (SessionTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
       default:
         throw new IllegalStateException(
             "Unexpected access strategy: " + strategy.getAccessStrategyType());
@@ -73,6 +77,10 @@ public abstract class IntermediateLayer {
       SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
       throws QueryProcessException, IOException;
 
+  protected abstract LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException;
+
   @Override
   public String toString() {
     return expression.toString();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index dcd467e0a7..4edf946b97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.udf.api.access.Row;
 import org.apache.iotdb.udf.api.access.RowWindow;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 
@@ -653,4 +654,122 @@ public class MultiInputColumnIntermediateLayer extends 
IntermediateLayer
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final IUDFInputDataSet udfInputDataSet = this;
+    final ElasticSerializableRowRecordList rowRecordList =
+        new ElasticSerializableRowRecordList(
+            dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+        new 
ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (rowRecordList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, 
rowRecordList.getTime(0));
+          hasAtLeastOneRow = rowRecordList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (rowRecordList.getTime(rowRecordList.size() - 1) < 
displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (rowRecordList.getTime(rowRecordList.size() - 2) >= 
displayWindowBegin
+                && rowRecordList.getTime(rowRecordList.size() - 1)
+                        - rowRecordList.getTime(rowRecordList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = rowRecordList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == 
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == 
YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = rowRecordList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which 
will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin 
<= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+          if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the 
timestamp of the query result
+          // set
+          if (i == rowRecordList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, 
nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < rowRecordList.size()) {
+          nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
+        }
+        rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return dataTypes;
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 18130cf77a..9171fcc6ed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.udf.api.access.Row;
 import org.apache.iotdb.udf.api.access.RowWindow;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 
@@ -542,4 +543,120 @@ public class 
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final SafetyPile safetyPile = safetyLine.addSafetyPile();
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(
+                    parentLayerPointReaderDataType, parentLayerPointReader, 
tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, 
tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldPoint(
+                  parentLayerPointReaderDataType, parentLayerPointReader, 
tvList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+                && tvList.getTime(tvList.size() - 1) - 
tvList.getTime(tvList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = tvList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == 
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == 
YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = tvList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which 
will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin 
<= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the 
timestamp of the query result
+          // set
+          if (i == tvList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, 
nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < tvList.size()) {
+          nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+        }
+        safetyPile.moveForwardTo(nextIndexBegin + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {parentLayerPointReaderDataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index d22bf0a091..6f46cd0f01 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializab
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.udf.api.access.Row;
 import org.apache.iotdb.udf.api.access.RowWindow;
+import 
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import 
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 
@@ -413,4 +414,120 @@ public class 
SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+      SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final long sessionTimeGap = strategy.getSessionTimeGap();
+
+    final ElasticSerializableTVList tvList =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, 
tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, 
tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, 
tvList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+                && tvList.getTime(tvList.size() - 1) - 
tvList.getTime(tvList.size() - 2)
+                    >= sessionTimeGap) {
+              nextIndexEnd = tvList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == 
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == 
YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = tvList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which 
will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin 
<= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the 
timestamp of the query result
+          // set
+          if (i == tvList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, 
nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException {
+        if (nextIndexEnd < tvList.size()) {
+          nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+        }
+        tvList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
 }
diff --git 
a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
 
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
index c3730f9d4b..d543559a5f 100644
--- 
a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
+++ 
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
@@ -41,7 +41,10 @@ public interface AccessStrategy {
     SLIDING_TIME_WINDOW,
 
     /** @see SlidingSizeWindowAccessStrategy */
-    SLIDING_SIZE_WINDOW
+    SLIDING_SIZE_WINDOW,
+
+    /** @see SessionTimeWindowAccessStrategy */
+    SESSION_TIME_WINDOW
   }
 
   /**
diff --git 
a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/SessionTimeWindowAccessStrategy.java
 
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/SessionTimeWindowAccessStrategy.java
new file mode 100644
index 0000000000..d85b4b0d4c
--- /dev/null
+++ 
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/SessionTimeWindowAccessStrategy.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.customizer.strategy;
+
+import java.time.ZoneId;
+
+public class SessionTimeWindowAccessStrategy implements AccessStrategy {
+
+  private final long displayWindowBegin;
+  private final long displayWindowEnd;
+  private final long sessionTimeGap;
+
+  private ZoneId zoneId;
+
+  /**
+   * @param displayWindowBegin displayWindowBegin < displayWindowEnd
+   * @param displayWindowEnd displayWindowBegin < displayWindowEnd
+   * @param sessionTimeGap 0 < sessionTimeGap
+   */
+  public SessionTimeWindowAccessStrategy(
+      long displayWindowBegin, long displayWindowEnd, long sessionTimeGap) {
+    this.displayWindowBegin = displayWindowBegin;
+    this.displayWindowEnd = displayWindowEnd;
+    this.sessionTimeGap = sessionTimeGap;
+  }
+
+  /**
+   * Display window begin will be set to the same as the minimum timestamp of 
the query result set,
+   * and display window end will be set to the same as the maximum timestamp 
of the query result
+   * set.
+   *
+   * @param sessionTimeGap 0 < sessionTimeGap
+   */
+  public SessionTimeWindowAccessStrategy(long sessionTimeGap) {
+    this.displayWindowBegin = Long.MIN_VALUE;
+    this.displayWindowEnd = Long.MAX_VALUE;
+    this.sessionTimeGap = sessionTimeGap;
+  }
+
+  @Override
+  public void check() {
+    if (sessionTimeGap <= 0) {
+      throw new RuntimeException(
+          String.format("Parameter sessionTimeGap(%d) should be positive.", 
sessionTimeGap));
+    }
+    if (displayWindowEnd < displayWindowBegin) {
+      throw new RuntimeException(
+          String.format(
+              "displayWindowEnd(%d) < displayWindowBegin(%d)",
+              displayWindowEnd, displayWindowBegin));
+    }
+  }
+
+  @Override
+  public AccessStrategyType getAccessStrategyType() {
+    return AccessStrategyType.SESSION_TIME_WINDOW;
+  }
+
+  public long getDisplayWindowBegin() {
+    return displayWindowBegin;
+  }
+
+  public long getDisplayWindowEnd() {
+    return displayWindowEnd;
+  }
+
+  public long getSessionTimeGap() {
+    return sessionTimeGap;
+  }
+
+  public ZoneId getZoneId() {
+    return zoneId;
+  }
+
+  public void setZoneId(ZoneId zoneId) {
+    this.zoneId = zoneId;
+  }
+}

Reply via email to