This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch FixFileTimeIndexBug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e9768b18175b26ea5844602f6eb95bca11199e00 Author: JackieTien97 <[email protected]> AuthorDate: Tue Dec 17 10:02:40 2024 +0800 Fix query scan will return duplicated timestamp or unordered timestamp while TsFileResource degrading --- .../it/env/cluster/config/MppCommonConfig.java | 6 ++ .../env/cluster/config/MppSharedCommonConfig.java | 7 ++ .../it/env/remote/config/RemoteCommonConfig.java | 5 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 2 + .../apache/iotdb/db/it/IoTDBFileTimeIndexIT.java | 117 +++++++++++++++++++++ .../execution/operator/source/SeriesScanUtil.java | 30 +++++- .../dataregion/read/QueryDataSource.java | 14 ++- .../dataregion/tsfile/TsFileResource.java | 12 ++- 8 files changed, 186 insertions(+), 7 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java index 791de1ce8c4..f1117c43e57 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java @@ -508,6 +508,12 @@ public class MppCommonConfig extends MppBaseConfig implements CommonConfig { return this; } + @Override + public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) { + setProperty("chunk_timeseriesmeta_free_memory_proportion", queryMemoryProportion); + return this; + } + // For part of the log directory public String getClusterConfigStr() { return fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS)) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java index 46d6751cc10..464bcae77c4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java @@ -519,4 +519,11 @@ public class MppSharedCommonConfig implements CommonConfig { cnConfig.setPipeConnectorRequestSliceThresholdBytes(pipeConnectorRequestSliceThresholdBytes); return this; } + + @Override + public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) { + dnConfig.setQueryMemoryProportion(queryMemoryProportion); + cnConfig.setQueryMemoryProportion(queryMemoryProportion); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java index 4b3994f8458..45c15e4e7a3 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java @@ -365,4 +365,9 @@ public class RemoteCommonConfig implements CommonConfig { int pipeConnectorRequestSliceThresholdBytes) { return this; } + + @Override + public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java index f6ad62dacff..fb8e2976817 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java @@ -161,4 +161,6 @@ public interface CommonConfig { CommonConfig setPipeConnectorRequestSliceThresholdBytes( int pipeConnectorRequestSliceThresholdBytes); + + CommonConfig setQueryMemoryProportion(String queryMemoryProportion); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java new file mode 100644 index 00000000000..a0d0a083701 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Locale; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) +public class IoTDBFileTimeIndexIT { + + private static final String[] sqls = + new String[] { + "insert into root.db.d1(time,s1) values(2,2)", + "insert into root.db.d1(time,s1) values(3,3)", + "flush", + "insert into root.db.d2(time,s1) values(5,5)", + "flush", + "insert into root.db.d1(time,s1) values(4,4)", + "flush", + "insert into root.db.d2(time,s1) values(1,1)", + "insert into root.db.d1(time,s1) values(3,30)", + "insert into root.db.d1(time,s1) values(4,40)", + "flush", + "insert into root.db.d2(time,s1) values(2,2)", + "insert into root.db.d1(time,s1) values(4,400)", + "flush", + }; + + @BeforeClass + public static void setUp() throws Exception { + Locale.setDefault(Locale.ENGLISH); + + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionGroupExtensionPolicy("CUSTOM") + .setDefaultDataRegionGroupNumPerDatabase(1) + .setEnableSeqSpaceCompaction(false) + .setEnableUnseqSpaceCompaction(false) + .setEnableCrossSpaceCompaction(false) + .setQueryMemoryProportion("1:100:200:50:200:200:0:250"); + // Adjust memstable threshold size to make it flush automatically + EnvFactory.getEnv().initClusterEnvironment(); + prepareData(); + } + + private static void prepareData() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + + for (String sql : sqls) { + statement.addBatch(sql); + } + statement.executeBatch(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testQuery() throws SQLException { + long[] time = {2L, 3L, 4L}; + double[] value = {2.0f, 30.0f, 400.0f}; + + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("select s1 from root.db.d1")) { + int cnt = 0; + while (resultSet.next()) { + assertEquals(time[cnt], resultSet.getLong(1)); + assertEquals(value[cnt], resultSet.getDouble(2), 0.00001); + cnt++; + } + assertEquals(time.length, cnt); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index c46f1e9ca5b..cf7e633ec26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -63,6 +63,7 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.PriorityQueue; import java.util.function.ToLongFunction; @@ -1158,17 +1159,27 @@ public class SeriesScanUtil implements Accountable { unpackUnseqTsFileResource(); } while (orderUtils.hasNextSeqResource() && orderUtils.isCurSeqOverlappedWith(endpointTime)) { - unpackSeqTsFileResource(); + Optional<ITimeSeriesMetadata> timeSeriesMetadata = unpackSeqTsFileResource(); + // asc: if current seq tsfile's endTime >= endpointTime, we don't need to continue + // desc: if current seq tsfile's startTime <= endpointTime, we don't need to continue + if (timeSeriesMetadata.isPresent() + && orderUtils.overlappedSeqResourceSearchingNeedStop( + endpointTime, timeSeriesMetadata.get().getStatistics())) { + break; + } } } - private void unpackSeqTsFileResource() throws IOException { + private Optional<ITimeSeriesMetadata> unpackSeqTsFileResource() throws IOException { ITimeSeriesMetadata timeseriesMetadata = loadTimeSeriesMetadata(orderUtils.getNextSeqFileResource(true), true); // skip if data type is mismatched which may be caused by delete if (timeseriesMetadata != null && timeseriesMetadata.typeMatch(getTsDataTypeList())) { timeseriesMetadata.setSeq(true); seqTimeSeriesMetadata.add(timeseriesMetadata); + return Optional.of(timeseriesMetadata); + } else { + return Optional.empty(); } } @@ -1336,6 +1347,9 @@ public class SeriesScanUtil implements Accountable { TsFileResource getNextUnseqFileResource(boolean isDelete); void setCurSeqFileIndex(QueryDataSource dataSource); + + boolean overlappedSeqResourceSearchingNeedStop( + long endPointTime, Statistics<? extends Object> currentStatistics); } class DescTimeOrderUtils implements TimeOrderUtils { @@ -1454,6 +1468,12 @@ public class SeriesScanUtil implements Accountable { public void setCurSeqFileIndex(QueryDataSource dataSource) { curSeqFileIndex = dataSource.getSeqResourcesSize() - 1; } + + @Override + public boolean overlappedSeqResourceSearchingNeedStop( + long endPointTime, Statistics<?> currentStatistics) { + return currentStatistics.getStartTime() <= endPointTime; + } } class AscTimeOrderUtils implements TimeOrderUtils { @@ -1572,6 +1592,12 @@ public class SeriesScanUtil implements Accountable { public void setCurSeqFileIndex(QueryDataSource dataSource) { curSeqFileIndex = 0; } + + @Override + public boolean overlappedSeqResourceSearchingNeedStop( + long endPointTime, Statistics<?> currentStatistics) { + return currentStatistics.getEndTime() >= endPointTime; + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java index 704bdeb6902..10f843c8173 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java @@ -45,7 +45,10 @@ public class QueryDataSource implements IQueryDataSource { private int curSeqIndex = -1; - // asc: startTime; desc: endTime + // asc: startTime, will be Long.MIN_VALUE if current tsfile resource is degraded + // desc: endTime, will be Long.MAX_VALUE if current tsfile resource is degraded + // if current tsfile resource is degraded, it will always be considered to be overlapping with + // current point private long curSeqOrderTime = 0; private Boolean curSeqSatisfied = null; @@ -109,7 +112,7 @@ public class QueryDataSource implements IQueryDataSource { boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0; if (res && curIndex != this.curSeqIndex) { this.curSeqIndex = curIndex; - this.curSeqOrderTime = seqResources.get(curIndex).getOrderTime(deviceID, ascending); + this.curSeqOrderTime = seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending); this.curSeqSatisfied = null; } return res; @@ -151,7 +154,9 @@ public class QueryDataSource implements IQueryDataSource { if (res && curIndex != this.curUnSeqIndex) { this.curUnSeqIndex = curIndex; this.curUnSeqOrderTime = - unseqResources.get(unSeqFileOrderIndex[curIndex]).getOrderTime(deviceID, ascending); + unseqResources + .get(unSeqFileOrderIndex[curIndex]) + .getOrderTimeForUnseq(deviceID, ascending); this.curUnSeqSatisfied = null; } return res; @@ -208,7 +213,8 @@ public class QueryDataSource implements IQueryDataSource { int index = 0; for (TsFileResource resource : unseqResources) { orderTimeToIndexMap - .computeIfAbsent(resource.getOrderTime(deviceId, ascending), key -> new ArrayList<>()) + .computeIfAbsent( + resource.getOrderTimeForUnseq(deviceId, ascending), key -> new ArrayList<>()) .add(index++); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 8c3d8e76a85..ed681abe8ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -601,7 +601,17 @@ public class TsFileResource implements PersistentResource { } } - public long getOrderTime(IDeviceID deviceId, boolean ascending) { + // cannot use FileTimeIndex + public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) { + if (timeIndex instanceof ArrayDeviceTimeIndex) { + return ascending ? getStartTime(deviceId) : getEndTime(deviceId); + } else { + return ascending ? Long.MIN_VALUE : Long.MAX_VALUE; + } + } + + // can use FileTimeIndex + public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) { return ascending ? getStartTime(deviceId) : getEndTime(deviceId); }
