This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3831 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 70f704ea57629b3b7c4bd760b7d7204f7a1586e6 Author: JackieTien97 <[email protected]> AuthorDate: Tue Nov 15 10:43:44 2022 +0800 [IOTDB-3831] Fix TTL doesn't take effect in last query --- .../iotdb/db/it/last/IoTDBLastWithTTLIT.java | 106 +++++++++++++++++++++ .../db/engine/querycontext/QueryDataSource.java | 5 + .../iotdb/db/engine/storagegroup/DataRegion.java | 4 + .../plan/planner/LocalExecutionPlanContext.java | 13 ++- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 3 +- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 7 +- 6 files changed, 134 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java new file mode 100644 index 0000000000..3d9b6a75e0 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/last/IoTDBLastWithTTLIT.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.it.last; + +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBLastWithTTLIT { + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initBeforeClass(); + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("insert into root.sg.d1(time, s1, s2) values(1, 1, 1)"); + statement.execute("insert into root.sg.d2(time, s1, s2) aligned values(2, 1, 1)"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanAfterClass(); + } + + @Test + public void withTTL() { + String[] retArray = + new String[] { + "1,root.sg.d1.s1,1.0,FLOAT", + "1,root.sg.d1.s2,1.0,FLOAT", + "2,root.sg.d2.s1,1.0,FLOAT", + "2,root.sg.d2.s2,1.0,FLOAT" + }; + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + try (ResultSet resultSet = + statement.executeQuery("select last * from root.sg.* order by timeseries asc")) { + int cnt = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(ColumnHeaderConstant.TIME) + + "," + + resultSet.getString(ColumnHeaderConstant.TIMESERIES) + + "," + + resultSet.getString(ColumnHeaderConstant.VALUE) + + "," + + resultSet.getString(ColumnHeaderConstant.DATATYPE); + assertEquals(retArray[cnt++], ans); + } + assertEquals(retArray.length, cnt); + } + + statement.execute("set ttl to root.sg 1"); + + try (ResultSet resultSet = + statement.executeQuery("select last * from root.sg.* order by timeseries asc")) { + assertFalse(resultSet.next()); + } + + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java index 995bf93961..63db473dc1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java @@ -76,6 +76,11 @@ public class QueryDataSource { /** @return an updated filter concerning TTL */ public Filter updateFilterUsingTTL(Filter filter) { + return updateFilterUsingTTL(filter, dataTTL); + } + + /** @return an updated filter concerning TTL */ + public static Filter updateFilterUsingTTL(Filter filter, long dataTTL) { if (dataTTL != Long.MAX_VALUE) { if (filter != null) { filter = new AndFilter(filter, TimeFilter.gtEq(DateTimeUtils.currentTime() - dataTTL)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index 3bcf0a916e..d252a70280 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -3369,6 +3369,10 @@ public class DataRegion { return dataRegionInfo.getMemCost(); } + public long getDataTTL() { + return dataTTL; + } + @TestOnly public ILastFlushTimeMap getLastFlushTimeMap() { return lastFlushTimeMap; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 6b644fb431..4a3d844b73 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java @@ -46,6 +46,8 @@ public class LocalExecutionPlanContext { private final Map<String, Set<String>> allSensorsMap; // Used to lock corresponding query resources private final List<DataSourceOperator> sourceOperators; + + private final long dataRegionTTL; private ISinkHandle sinkHandle; private int nextOperatorId = 0; @@ -62,16 +64,19 @@ public class LocalExecutionPlanContext { private final RuleBasedTimeSliceAllocator timeSliceAllocator; + // for data region public LocalExecutionPlanContext( - TypeProvider typeProvider, FragmentInstanceContext instanceContext) { + TypeProvider typeProvider, FragmentInstanceContext instanceContext, long dataRegionTTL) { this.typeProvider = typeProvider; this.instanceContext = instanceContext; this.paths = new ArrayList<>(); this.allSensorsMap = new HashMap<>(); this.sourceOperators = new ArrayList<>(); this.timeSliceAllocator = new RuleBasedTimeSliceAllocator(); + this.dataRegionTTL = dataRegionTTL; } + // for schema region public LocalExecutionPlanContext(FragmentInstanceContext instanceContext) { this.instanceContext = instanceContext; this.paths = new ArrayList<>(); @@ -81,6 +86,8 @@ public class LocalExecutionPlanContext { // only used in `order by heat` this.timeSliceAllocator = new RuleBasedTimeSliceAllocator(); + // there is no ttl in schema region, so we don't care this field + this.dataRegionTTL = Long.MAX_VALUE; } public int getNextOperatorId() { @@ -158,4 +165,8 @@ public class LocalExecutionPlanContext { public boolean isNeedUpdateLastCache() { return needUpdateLastCache; } + + public long getDataRegionTTL() { + return dataRegionTTL; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index f292777567..2121d1547e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -63,7 +63,8 @@ public class LocalExecutionPlanner { Filter timeFilter, DataRegion dataRegion) throws MemoryNotEnoughException { - LocalExecutionPlanContext context = new LocalExecutionPlanContext(types, instanceContext); + LocalExecutionPlanContext context = + new LocalExecutionPlanContext(types, instanceContext, dataRegion.getDataTTL()); Operator root = plan.accept(new OperatorTreeGenerator(), context); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index a02cbd773a..c7964ae828 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -199,6 +199,7 @@ import java.util.Objects; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.engine.querycontext.QueryDataSource.updateFilterUsingTTL; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSize; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; @@ -1618,7 +1619,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP if (timeValuePair == null) { // last value is not cached return createUpdateLastCacheOperator(node, context, node.getSeriesPath()); } else if (!LastQueryUtil.satisfyFilter( - context.getLastQueryTimeFilter(), timeValuePair)) { // cached last value is not satisfied + updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()), + timeValuePair)) { // cached last value is not satisfied boolean isFilterGtOrGe = (context.getLastQueryTimeFilter() instanceof Gt @@ -1701,7 +1703,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return createUpdateLastCacheOperator( node, context, node.getSeriesPath().getMeasurementPath()); } else if (!LastQueryUtil.satisfyFilter( - context.getLastQueryTimeFilter(), timeValuePair)) { // cached last value is not satisfied + updateFilterUsingTTL(context.getLastQueryTimeFilter(), context.getDataRegionTTL()), + timeValuePair)) { // cached last value is not satisfied boolean isFilterGtOrGe = (context.getLastQueryTimeFilter() instanceof Gt
