This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new db0b6fd14f [IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF
(#6928)
db0b6fd14f is described below
commit db0b6fd14f6b154c0562719e44d7fefa3b3b2607
Author: AACEPT <[email protected]>
AuthorDate: Mon Aug 15 20:50:27 2022 +0800
[IOTDB-4073] Add SessionTimeWindowAccessStrategy in UDF (#6928)
---
.../db/query/udf/example/ExampleUDFConstant.java | 2 +
.../iotdb/db/query/udf/example/WindowStartEnd.java | 16 +-
.../iotdb/itbase/constant/UDFTestConstant.java | 2 +
.../db/it/udf/IoTDBUDFSessionWindowQueryIT.java | 283 +++++++++++++++++++++
.../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java | 7 +-
.../plan/expression/binary/BinaryExpression.java | 1 +
.../plan/expression/leaf/TimeSeriesOperand.java | 1 +
.../visitor/IntermediateLayerVisitor.java | 1 +
.../intermediate/ConstantIntermediateLayer.java | 8 +
.../dag/intermediate/IntermediateLayer.java | 8 +
.../MultiInputColumnIntermediateLayer.java | 119 +++++++++
...InputColumnMultiReferenceIntermediateLayer.java | 117 +++++++++
...nputColumnSingleReferenceIntermediateLayer.java | 117 +++++++++
.../api/customizer/strategy/AccessStrategy.java | 5 +-
.../strategy/SessionTimeWindowAccessStrategy.java | 95 +++++++
15 files changed, 778 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
index 87984d2db8..3dfb8303a0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
@@ -23,9 +23,11 @@ public class ExampleUDFConstant {
public static final String ACCESS_STRATEGY_ROW_BY_ROW = "row-by-row";
public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
+ public static final String ACCESS_STRATEGY_SESSION = "session";
public static final String WINDOW_SIZE_KEY = "windowSize";
public static final String TIME_INTERVAL_KEY = "timeInterval";
public static final String SLIDING_STEP_KEY = "slidingStep";
+ public static final String SESSION_GAP_KEY = "sessionGap";
public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
index fb1f66ec4b..bd3a369365 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
+import
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
@@ -44,7 +45,8 @@ public class WindowStartEnd implements UDTF {
parameters.getInt(ExampleUDFConstant.SLIDING_STEP_KEY))
: new SlidingSizeWindowAccessStrategy(
parameters.getInt(ExampleUDFConstant.WINDOW_SIZE_KEY)));
- } else {
+ } else if (ExampleUDFConstant.ACCESS_STRATEGY_SLIDING_TIME.equals(
+ parameters.getString(ExampleUDFConstant.ACCESS_STRATEGY_KEY))) {
configurations.setAccessStrategy(
parameters.hasAttribute(ExampleUDFConstant.SLIDING_STEP_KEY)
&&
parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY)
@@ -56,6 +58,18 @@ public class WindowStartEnd implements UDTF {
parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY))
: new SlidingTimeWindowAccessStrategy(
parameters.getLong(ExampleUDFConstant.TIME_INTERVAL_KEY)));
+ } else if (ExampleUDFConstant.ACCESS_STRATEGY_SESSION.equals(
+ parameters.getString(ExampleUDFConstant.ACCESS_STRATEGY_KEY))) {
+ configurations.setAccessStrategy(
+ parameters.hasAttribute(ExampleUDFConstant.SESSION_GAP_KEY)
+ &&
parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY)
+ &&
parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY)
+ ? new SessionTimeWindowAccessStrategy(
+
parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY),
+
parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY),
+ parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY))
+ : new SessionTimeWindowAccessStrategy(
+ parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY)));
}
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
index 6613eed077..31dfeeb661 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
@@ -23,9 +23,11 @@ public class UDFTestConstant {
public static final String ACCESS_STRATEGY_ROW_BY_ROW = "row-by-row";
public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
+ public static final String ACCESS_STRATEGY_SESSION = "session";
public static final String WINDOW_SIZE_KEY = "windowSize";
public static final String TIME_INTERVAL_KEY = "timeInterval";
public static final String SLIDING_STEP_KEY = "slidingStep";
+ public static final String SESSION_GAP_KEY = "sessionGap";
public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
new file mode 100644
index 0000000000..5698d7d4bc
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.constant.UDFTestConstant;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBUDFSessionWindowQueryIT {
+
+ protected static final int ITERATION_TIMES = 1000;
+
+ protected static boolean enableSeqSpaceCompaction;
+ protected static boolean enableUnseqSpaceCompaction;
+ protected static boolean enableCrossSpaceCompaction;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ enableSeqSpaceCompaction =
ConfigFactory.getConfig().isEnableSeqSpaceCompaction();
+ enableUnseqSpaceCompaction =
ConfigFactory.getConfig().isEnableUnseqSpaceCompaction();
+ enableCrossSpaceCompaction =
ConfigFactory.getConfig().isEnableCrossSpaceCompaction();
+ ConfigFactory.getConfig().setEnableSeqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(false);
+ ConfigFactory.getConfig().setEnableCrossSpaceCompaction(false);
+ ConfigFactory.getConfig()
+ .setUdfCollectorMemoryBudgetInMB(5)
+ .setUdfTransformerMemoryBudgetInMB(5)
+ .setUdfReaderMemoryBudgetInMB(5);
+ EnvFactory.getEnv().initBeforeClass();
+ createTimeSeries();
+ generateData();
+ registerUDF();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterClass();
+
ConfigFactory.getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+
ConfigFactory.getConfig().setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+ ConfigFactory.getConfig()
+ .setUdfCollectorMemoryBudgetInMB(100)
+ .setUdfTransformerMemoryBudgetInMB(100)
+ .setUdfReaderMemoryBudgetInMB(100);
+ }
+
+ private static void createTimeSeries() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.vehicle");
+ statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with
datatype=INT32,encoding=PLAIN");
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ private static void generateData() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ for (int i = 0; i < ITERATION_TIMES; ++i) {
+ if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i
== 54 || i == 996
+ || i == 997 || i == 998) {
+ continue;
+ }
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s3)
values(%d,%d)", i, i)));
+ }
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ private static void registerUDF() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "create function window_start_end as
'org.apache.iotdb.db.query.udf.example.WindowStartEnd'");
+ } catch (SQLException throwable) {
+ fail(throwable.getMessage());
+ }
+ }
+
+ private void testSessionTimeWindowSS(
+ String sessionGap, long[] windowStart, long[] windowEnd, Long
displayBegin, Long displayEnd) {
+ String sql;
+ if (displayBegin == null) {
+ sql =
+ String.format(
+ "select window_start_end(s3, '%s'='%s', '%s'='%s') from
root.vehicle.d1",
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_SESSION,
+ UDFTestConstant.SESSION_GAP_KEY,
+ sessionGap);
+ } else {
+ sql =
+ String.format(
+ "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s',
'%s'='%s') from root.vehicle.d1",
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_SESSION,
+ UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+ displayBegin,
+ UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+ displayEnd,
+ UDFTestConstant.SESSION_GAP_KEY,
+ sessionGap);
+ }
+
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ assertEquals(2, resultSet.getMetaData().getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ Assert.assertEquals(resultSet.getLong(1), windowStart[cnt]);
+ Assert.assertEquals(resultSet.getLong(2), windowEnd[cnt]);
+ cnt++;
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSessionTimeWindowSS1() {
+ String sessionGap = "2";
+ long[] windowStart = new long[] {0, 8, 55, 999};
+ long[] windowEnd = new long[] {4, 50, 995, 999};
+ testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+ }
+
+ @Test
+ public void testSessionTimeWindowSS2() {
+ String sessionGap = "5";
+ long[] windowStart = new long[] {0, 55};
+ long[] windowEnd = new long[] {50, 999};
+ testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+ }
+
+ @Test
+ public void testSessionTimeWindowSS3() {
+ String sessionGap = "6";
+ long[] windowStart = new long[] {0};
+ long[] windowEnd = new long[] {999};
+ testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, null, null);
+ }
+
+ @Test
+ public void testSessionTimeWindowSS4() {
+ String sessionGap = "2";
+ Long displayBegin = 1L;
+ Long displayEnd = 993L;
+ long[] windowStart = new long[] {1, 8, 55};
+ long[] windowEnd = new long[] {4, 50, 992};
+ testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin,
displayEnd);
+ }
+
+ @Test
+ public void testSessionTimeWindowSS5() {
+ String sessionGap = "5";
+ Long displayBegin = 43L;
+ Long displayEnd = 100L;
+ long[] windowStart = new long[] {43, 55};
+ long[] windowEnd = new long[] {50, 99};
+ testSessionTimeWindowSS(sessionGap, windowStart, windowEnd, displayBegin,
displayEnd);
+ }
+
+ @Test
+ public void testSessionTimeWindowSS6() {
+ String sessionGap = "1";
+ Long displayBegin = 2L;
+ Long displayEnd = 20000L;
+ ArrayList<Long> windowStart = new ArrayList<>();
+ ArrayList<Long> windowEnd = new ArrayList<>();
+ for (long i = displayBegin; i < ITERATION_TIMES; i++) {
+ if (i == 5 || i == 6 || i == 7 || i == 51 || i == 52 || i == 53 || i ==
54 || i == 996
+ || i == 997 || i == 998) {
+ continue;
+ }
+ windowStart.add(i);
+ windowEnd.add(i);
+ }
+ testSessionTimeWindowSS(
+ sessionGap,
+ windowStart.stream().mapToLong(t -> t).toArray(),
+ windowEnd.stream().mapToLong(t -> t).toArray(),
+ displayBegin,
+ displayEnd);
+ }
+
+ private void testSessionTimeWindowSSOutOfRange(
+ String sessionGap, Long displayBegin, Long displayEnd) {
+ String sql;
+ if (displayBegin == null) {
+ sql =
+ String.format(
+ "select window_start_end(s3, '%s'='%s', '%s'='%s') from
root.vehicle.d1",
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_SESSION,
+ UDFTestConstant.SESSION_GAP_KEY,
+ sessionGap);
+ } else {
+ sql =
+ String.format(
+ "select window_start_end(s3, '%s'='%s', '%s'='%s', '%s'='%s',
'%s'='%s') from root.vehicle.d1",
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_SESSION,
+ UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+ displayBegin,
+ UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+ displayEnd,
+ UDFTestConstant.SESSION_GAP_KEY,
+ sessionGap);
+ }
+
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ assertEquals(2, resultSet.getMetaData().getColumnCount());
+ int count = 0;
+ while (resultSet.next()) {
+ count++;
+ }
+ assertEquals(count, 0);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSessionTimeWindowSS7() {
+ String sessionGap = "2";
+ Long displayBegin = 1000000L;
+ Long displayEnd = 2000000L;
+ testSessionTimeWindowSSOutOfRange(sessionGap, displayBegin, displayEnd);
+ }
+
+ @Test
+ public void testSessionTimeWindowSS8() {
+ String sessionGap = "2";
+ Long displayBegin = -2000000L;
+ Long displayEnd = -100L;
+ testSessionTimeWindowSSOutOfRange(sessionGap, displayBegin, displayEnd);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
index 4285c87bfd..c654f71c72 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java
@@ -499,8 +499,11 @@ public class IoTDBUDFWindowQueryIT {
sql =
String.format(
- "select window_start_end(s1, '%s'='%s') from root.vehicle.d1",
- UDFTestConstant.TIME_INTERVAL_KEY, timeInterval);
+ "select window_start_end(s1, '%s'='%s', '%s'='%s') from
root.vehicle.d1",
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_SLIDING_TIME,
+ UDFTestConstant.TIME_INTERVAL_KEY,
+ timeInterval);
try (Connection conn = EnvFactory.getEnv().getConnection();
Statement statement = conn.createStatement();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
index 0ee8b1ab84..5a0732dbb3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/binary/BinaryExpression.java
@@ -203,6 +203,7 @@ public abstract class BinaryExpression extends Expression {
rightExpression.bindInputLayerColumnIndexWithExpression(inputLocations);
final String digest = toString();
+
if (inputLocations.containsKey(digest)) {
inputColumnIndex =
inputLocations.get(digest).get(0).getValueColumnIndex();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
index 264bb5381b..7195dc992e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/leaf/TimeSeriesOperand.java
@@ -104,6 +104,7 @@ public class TimeSeriesOperand extends LeafOperand {
public void bindInputLayerColumnIndexWithExpression(
Map<String, List<InputLocation>> inputLocations) {
final String digest = toString();
+
if (inputLocations.containsKey(digest)) {
inputColumnIndex =
inputLocations.get(digest).get(0).getValueColumnIndex();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
index 75095e51d0..f7d9a4e6c5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
@@ -396,6 +396,7 @@ public class IntermediateLayerVisitor
return new
UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(),
executor);
case SLIDING_SIZE_WINDOW:
case SLIDING_TIME_WINDOW:
+ case SESSION_TIME_WINDOW:
return new UDFQueryRowWindowTransformer(
udfInputIntermediateLayer.constructRowWindowReader(
accessStrategy, context.memoryAssigner.assign()),
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
index 41a19905bd..440c2e6fbd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowWindowReader;
import org.apache.iotdb.db.mpp.transformation.dag.input.ConstantInputReader;
+import
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -63,4 +64,11 @@ public class ConstantIntermediateLayer extends
IntermediateLayer {
// Not allowed since the timestamp of a constant row is not defined.
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+ SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ // Not allowed since the timestamp of a constant row is not defined.
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
index d90955120a..0a8cd2e6b4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowWindowReader;
import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
+import
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -59,6 +60,9 @@ public abstract class IntermediateLayer {
case SLIDING_SIZE_WINDOW:
return constructRowSlidingSizeWindowReader(
(SlidingSizeWindowAccessStrategy) strategy, memoryBudgetInMB);
+ case SESSION_TIME_WINDOW:
+ return constructRowSessionTimeWindowReader(
+ (SessionTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
default:
throw new IllegalStateException(
"Unexpected access strategy: " + strategy.getAccessStrategyType());
@@ -73,6 +77,10 @@ public abstract class IntermediateLayer {
SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException, IOException;
+ protected abstract LayerRowWindowReader constructRowSessionTimeWindowReader(
+ SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException, IOException;
+
@Override
public String toString() {
return expression.toString();
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index dcd467e0a7..4edf946b97 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -36,6 +36,7 @@ import
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
+import
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -653,4 +654,122 @@ public class MultiInputColumnIntermediateLayer extends
IntermediateLayer
}
};
}
+
+ @Override
+ protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+ SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException {
+ final long displayWindowBegin = strategy.getDisplayWindowBegin();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+ final long sessionTimeGap = strategy.getSessionTimeGap();
+
+ final IUDFInputDataSet udfInputDataSet = this;
+ final ElasticSerializableRowRecordList rowRecordList =
+ new ElasticSerializableRowRecordList(
+ dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+ final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
+ new
ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
+
+ return new LayerRowWindowReader() {
+
+ private boolean isFirstIteration = true;
+ private boolean hasAtLeastOneRow = false;
+
+ private long nextWindowTimeBegin = displayWindowBegin;
+ private long nextWindowTimeEnd = 0;
+ private int nextIndexBegin = 0;
+ private int nextIndexEnd = 1;
+
+ @Override
+ public YieldableState yield() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (rowRecordList.size() == 0) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+ if (yieldableState != YieldableState.YIELDABLE) {
+ return yieldableState;
+ }
+ }
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
rowRecordList.getTime(0));
+ hasAtLeastOneRow = rowRecordList.size() != 0;
+ isFirstIteration = false;
+ }
+
+ if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ while (rowRecordList.getTime(rowRecordList.size() - 1) <
displayWindowEnd) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
+ if (yieldableState == YieldableState.YIELDABLE) {
+ if (rowRecordList.getTime(rowRecordList.size() - 2) >=
displayWindowBegin
+ && rowRecordList.getTime(rowRecordList.size() - 1)
+ - rowRecordList.getTime(rowRecordList.size() - 2)
+ >= sessionTimeGap) {
+ nextIndexEnd = rowRecordList.size() - 1;
+ break;
+ } else {
+ nextIndexEnd++;
+ }
+ } else if (yieldableState ==
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+ return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+ } else if (yieldableState ==
YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+ nextIndexEnd = rowRecordList.size();
+ break;
+ }
+ }
+
+ nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
+
+ if (nextIndexBegin == nextIndexEnd) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ // Only if encounter user set the strategy's displayWindowBegin, which
will go into the for
+ // loop to find the true index of the first window begin.
+ // For other situation, we will only go into if (nextWindowTimeBegin
<= tvList.getTime(i))
+ // once.
+ for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
+ if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ // The first window's beginning time is greater than all the
timestamp of the query result
+ // set
+ if (i == rowRecordList.size() - 1) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+ }
+
+ window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin,
nextWindowTimeEnd);
+
+ return YieldableState.YIELDABLE;
+ }
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ return false;
+ }
+
+ @Override
+ public void readyForNext() throws IOException, QueryProcessException {
+ if (nextIndexEnd < rowRecordList.size()) {
+ nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
+ }
+ rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
+ nextIndexBegin = nextIndexEnd;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return dataTypes;
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 18130cf77a..9171fcc6ed 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
+import
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -542,4 +543,120 @@ public class
SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
}
};
}
+
+ @Override
+ protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+ SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ final long displayWindowBegin = strategy.getDisplayWindowBegin();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+ final long sessionTimeGap = strategy.getSessionTimeGap();
+
+ final SafetyPile safetyPile = safetyLine.addSafetyPile();
+ final ElasticSerializableTVListBackedSingleColumnWindow window =
+ new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+ return new LayerRowWindowReader() {
+
+ private boolean isFirstIteration = true;
+ private boolean hasAtLeastOneRow = false;
+
+ private long nextWindowTimeBegin = displayWindowBegin;
+ private long nextWindowTimeEnd = 0;
+ private int nextIndexBegin = 0;
+ private int nextIndexEnd = 1;
+
+ @Override
+ public YieldableState yield() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (tvList.size() == 0) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(
+ parentLayerPointReaderDataType, parentLayerPointReader,
tvList);
+ if (yieldableState != YieldableState.YIELDABLE) {
+ return yieldableState;
+ }
+ }
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
tvList.getTime(0));
+ hasAtLeastOneRow = tvList.size() != 0;
+ isFirstIteration = false;
+ }
+
+ if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(
+ parentLayerPointReaderDataType, parentLayerPointReader,
tvList);
+ if (yieldableState == YieldableState.YIELDABLE) {
+ if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+ && tvList.getTime(tvList.size() - 1) -
tvList.getTime(tvList.size() - 2)
+ >= sessionTimeGap) {
+ nextIndexEnd = tvList.size() - 1;
+ break;
+ } else {
+ nextIndexEnd++;
+ }
+ } else if (yieldableState ==
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+ return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+ } else if (yieldableState ==
YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+ nextIndexEnd = tvList.size();
+ break;
+ }
+ }
+
+ nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+ if (nextIndexBegin == nextIndexEnd) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ // Only if encounter user set the strategy's displayWindowBegin, which
will go into the for
+ // loop to find the true index of the first window begin.
+ // For other situation, we will only go into if (nextWindowTimeBegin
<= tvList.getTime(i))
+ // once.
+ for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+ if (nextWindowTimeBegin <= tvList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ // The first window's beginning time is greater than all the
timestamp of the query result
+ // set
+ if (i == tvList.size() - 1) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+ }
+
+ window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin,
nextWindowTimeEnd);
+
+ return YieldableState.YIELDABLE;
+ }
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ return false;
+ }
+
+ @Override
+ public void readyForNext() throws IOException, QueryProcessException {
+ if (nextIndexEnd < tvList.size()) {
+ nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+ }
+ safetyPile.moveForwardTo(nextIndexBegin + 1);
+ tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+ nextIndexBegin = nextIndexEnd;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return new TSDataType[] {parentLayerPointReaderDataType};
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index d22bf0a091..6f46cd0f01 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializab
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
+import
org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import
org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -413,4 +414,120 @@ public class
SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
}
};
}
+
+ @Override
+ protected LayerRowWindowReader constructRowSessionTimeWindowReader(
+ SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+
+ final long displayWindowBegin = strategy.getDisplayWindowBegin();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+ final long sessionTimeGap = strategy.getSessionTimeGap();
+
+ final ElasticSerializableTVList tvList =
+ ElasticSerializableTVList.newElasticSerializableTVList(
+ dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+ final ElasticSerializableTVListBackedSingleColumnWindow window =
+ new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+ return new LayerRowWindowReader() {
+
+ private boolean isFirstIteration = true;
+ private boolean hasAtLeastOneRow = false;
+
+ private long nextWindowTimeBegin = displayWindowBegin;
+ private long nextWindowTimeEnd = 0;
+ private int nextIndexBegin = 0;
+ private int nextIndexEnd = 1;
+
+ @Override
+ public YieldableState yield() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (tvList.size() == 0) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader,
tvList);
+ if (yieldableState != YieldableState.YIELDABLE) {
+ return yieldableState;
+ }
+ }
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
tvList.getTime(0));
+ hasAtLeastOneRow = tvList.size() != 0;
+ isFirstIteration = false;
+ }
+
+ if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader,
tvList);
+ if (yieldableState == YieldableState.YIELDABLE) {
+ if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+ && tvList.getTime(tvList.size() - 1) -
tvList.getTime(tvList.size() - 2)
+ >= sessionTimeGap) {
+ nextIndexEnd = tvList.size() - 1;
+ break;
+ } else {
+ nextIndexEnd++;
+ }
+ } else if (yieldableState ==
YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+ return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+ } else if (yieldableState ==
YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+ nextIndexEnd = tvList.size();
+ break;
+ }
+ }
+
+ nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+ if (nextIndexBegin == nextIndexEnd) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ // Only if encounter user set the strategy's displayWindowBegin, which
will go into the for
+ // loop to find the true index of the first window begin.
+ // For other situation, we will only go into if (nextWindowTimeBegin
<= tvList.getTime(i))
+ // once.
+ for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+ if (nextWindowTimeBegin <= tvList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ // The first window's beginning time is greater than all the
timestamp of the query result
+ // set
+ if (i == tvList.size() - 1) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+ }
+
+ window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin,
nextWindowTimeEnd);
+
+ return YieldableState.YIELDABLE;
+ }
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ return false;
+ }
+
+ @Override
+ public void readyForNext() throws IOException {
+ if (nextIndexEnd < tvList.size()) {
+ nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+ }
+ tvList.setEvictionUpperBound(nextIndexBegin + 1);
+ nextIndexBegin = nextIndexEnd;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return new TSDataType[] {dataType};
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
}
diff --git
a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
index c3730f9d4b..d543559a5f 100644
---
a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
+++
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
@@ -41,7 +41,10 @@ public interface AccessStrategy {
SLIDING_TIME_WINDOW,
/** @see SlidingSizeWindowAccessStrategy */
- SLIDING_SIZE_WINDOW
+ SLIDING_SIZE_WINDOW,
+
+ /** @see SessionTimeWindowAccessStrategy */
+ SESSION_TIME_WINDOW
}
/**
diff --git
a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/SessionTimeWindowAccessStrategy.java
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/SessionTimeWindowAccessStrategy.java
new file mode 100644
index 0000000000..d85b4b0d4c
--- /dev/null
+++
b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/SessionTimeWindowAccessStrategy.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.customizer.strategy;
+
+import java.time.ZoneId;
+
+public class SessionTimeWindowAccessStrategy implements AccessStrategy {
+
+ private final long displayWindowBegin;
+ private final long displayWindowEnd;
+ private final long sessionTimeGap;
+
+ private ZoneId zoneId;
+
+ /**
+ * @param displayWindowBegin displayWindowBegin < displayWindowEnd
+ * @param displayWindowEnd displayWindowBegin < displayWindowEnd
+ * @param sessionTimeGap 0 < sessionTimeGap
+ */
+ public SessionTimeWindowAccessStrategy(
+ long displayWindowBegin, long displayWindowEnd, long sessionTimeGap) {
+ this.displayWindowBegin = displayWindowBegin;
+ this.displayWindowEnd = displayWindowEnd;
+ this.sessionTimeGap = sessionTimeGap;
+ }
+
+ /**
+ * Display window begin will be set to the same as the minimum timestamp of
the query result set,
+ * and display window end will be set to the same as the maximum timestamp
of the query result
+ * set.
+ *
+ * @param sessionTimeGap 0 < sessionTimeGap
+ */
+ public SessionTimeWindowAccessStrategy(long sessionTimeGap) {
+ this.displayWindowBegin = Long.MIN_VALUE;
+ this.displayWindowEnd = Long.MAX_VALUE;
+ this.sessionTimeGap = sessionTimeGap;
+ }
+
+ @Override
+ public void check() {
+ if (sessionTimeGap <= 0) {
+ throw new RuntimeException(
+ String.format("Parameter sessionTimeGap(%d) should be positive.",
sessionTimeGap));
+ }
+ if (displayWindowEnd < displayWindowBegin) {
+ throw new RuntimeException(
+ String.format(
+ "displayWindowEnd(%d) < displayWindowBegin(%d)",
+ displayWindowEnd, displayWindowBegin));
+ }
+ }
+
+ @Override
+ public AccessStrategyType getAccessStrategyType() {
+ return AccessStrategyType.SESSION_TIME_WINDOW;
+ }
+
+ public long getDisplayWindowBegin() {
+ return displayWindowBegin;
+ }
+
+ public long getDisplayWindowEnd() {
+ return displayWindowEnd;
+ }
+
+ public long getSessionTimeGap() {
+ return sessionTimeGap;
+ }
+
+ public ZoneId getZoneId() {
+ return zoneId;
+ }
+
+ public void setZoneId(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+}