This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 635f9101de5 [IOTDB-6355] [To dev/1.3] Fix query scan will return
duplicated timestamp or unordered timestamp while TsFileResource degrading
635f9101de5 is described below
commit 635f9101de58798559cd9a8b4880be977752f5ea
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Dec 17 16:23:37 2024 +0800
[IOTDB-6355] [To dev/1.3] Fix query scan will return duplicated timestamp
or unordered timestamp while TsFileResource degrading
---
.../it/env/cluster/config/MppCommonConfig.java | 6 ++
.../env/cluster/config/MppSharedCommonConfig.java | 7 ++
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../apache/iotdb/db/it/IoTDBFileTimeIndexIT.java | 117 +++++++++++++++++++++
.../execution/operator/source/SeriesScanUtil.java | 30 +++++-
.../dataregion/read/QueryDataSource.java | 14 ++-
.../dataregion/tsfile/TsFileResource.java | 12 ++-
8 files changed, 186 insertions(+), 7 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 513c8f6ff8b..4edf6dd9420 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -484,6 +484,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
+ setProperty("chunk_timeseriesmeta_free_memory_proportion",
queryMemoryProportion);
+ return this;
+ }
+
// For part of the log directory
public String getClusterConfigStr() {
return
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 6e0115fc4e2..700ca3821c3 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -491,4 +491,11 @@ public class MppSharedCommonConfig implements CommonConfig
{
cnConfig.setPipeConnectorRequestSliceThresholdBytes(pipeConnectorRequestSliceThresholdBytes);
return this;
}
+
+ @Override
+ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
+ dnConfig.setQueryMemoryProportion(queryMemoryProportion);
+ cnConfig.setQueryMemoryProportion(queryMemoryProportion);
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 25489a42d14..52c233556a5 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -345,4 +345,9 @@ public class RemoteCommonConfig implements CommonConfig {
int pipeConnectorRequestSliceThresholdBytes) {
return this;
}
+
+ @Override
+ public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index f08c93089cf..38f5a9d8030 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -153,4 +153,6 @@ public interface CommonConfig {
CommonConfig setPipeConnectorRequestSliceThresholdBytes(
int pipeConnectorRequestSliceThresholdBytes);
+
+ CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java
new file mode 100644
index 00000000000..a0d0a083701
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFileTimeIndexIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBFileTimeIndexIT {
+
+ private static final String[] sqls =
+ new String[] {
+ "insert into root.db.d1(time,s1) values(2,2)",
+ "insert into root.db.d1(time,s1) values(3,3)",
+ "flush",
+ "insert into root.db.d2(time,s1) values(5,5)",
+ "flush",
+ "insert into root.db.d1(time,s1) values(4,4)",
+ "flush",
+ "insert into root.db.d2(time,s1) values(1,1)",
+ "insert into root.db.d1(time,s1) values(3,30)",
+ "insert into root.db.d1(time,s1) values(4,40)",
+ "flush",
+ "insert into root.db.d2(time,s1) values(2,2)",
+ "insert into root.db.d1(time,s1) values(4,400)",
+ "flush",
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Locale.setDefault(Locale.ENGLISH);
+
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataRegionGroupExtensionPolicy("CUSTOM")
+ .setDefaultDataRegionGroupNumPerDatabase(1)
+ .setEnableSeqSpaceCompaction(false)
+ .setEnableUnseqSpaceCompaction(false)
+ .setEnableCrossSpaceCompaction(false)
+ .setQueryMemoryProportion("1:100:200:50:200:200:0:250");
+ // Adjust memstable threshold size to make it flush automatically
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData();
+ }
+
+ private static void prepareData() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : sqls) {
+ statement.addBatch(sql);
+ }
+ statement.executeBatch();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testQuery() throws SQLException {
+ long[] time = {2L, 3L, 4L};
+ double[] value = {2.0f, 30.0f, 400.0f};
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("select s1 from
root.db.d1")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ assertEquals(time[cnt], resultSet.getLong(1));
+ assertEquals(value[cnt], resultSet.getDouble(2), 0.00001);
+ cnt++;
+ }
+ assertEquals(time.length, cnt);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 0dae7e18cca..c6f79ca6424 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -62,6 +62,7 @@ import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.function.ToLongFunction;
@@ -1141,17 +1142,27 @@ public class SeriesScanUtil implements Accountable {
unpackUnseqTsFileResource();
}
while (orderUtils.hasNextSeqResource() &&
orderUtils.isCurSeqOverlappedWith(endpointTime)) {
- unpackSeqTsFileResource();
+ Optional<ITimeSeriesMetadata> timeSeriesMetadata =
unpackSeqTsFileResource();
+ // asc: if current seq tsfile's endTime >= endpointTime, we don't need
to continue
+ // desc: if current seq tsfile's startTime <= endpointTime, we don't
need to continue
+ if (timeSeriesMetadata.isPresent()
+ && orderUtils.overlappedSeqResourceSearchingNeedStop(
+ endpointTime, timeSeriesMetadata.get().getStatistics())) {
+ break;
+ }
}
}
- private void unpackSeqTsFileResource() throws IOException {
+ private Optional<ITimeSeriesMetadata> unpackSeqTsFileResource() throws
IOException {
ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(orderUtils.getNextSeqFileResource(true), true);
// skip if data type is mismatched which may be caused by delete
if (timeseriesMetadata != null &&
timeseriesMetadata.typeMatch(getTsDataTypeList())) {
timeseriesMetadata.setSeq(true);
seqTimeSeriesMetadata.add(timeseriesMetadata);
+ return Optional.of(timeseriesMetadata);
+ } else {
+ return Optional.empty();
}
}
@@ -1316,6 +1327,9 @@ public class SeriesScanUtil implements Accountable {
TsFileResource getNextUnseqFileResource(boolean isDelete);
void setCurSeqFileIndex(QueryDataSource dataSource);
+
+ boolean overlappedSeqResourceSearchingNeedStop(
+ long endPointTime, Statistics<? extends Object> currentStatistics);
}
class DescTimeOrderUtils implements TimeOrderUtils {
@@ -1434,6 +1448,12 @@ public class SeriesScanUtil implements Accountable {
public void setCurSeqFileIndex(QueryDataSource dataSource) {
curSeqFileIndex = dataSource.getSeqResourcesSize() - 1;
}
+
+ @Override
+ public boolean overlappedSeqResourceSearchingNeedStop(
+ long endPointTime, Statistics<?> currentStatistics) {
+ return currentStatistics.getStartTime() <= endPointTime;
+ }
}
class AscTimeOrderUtils implements TimeOrderUtils {
@@ -1552,6 +1572,12 @@ public class SeriesScanUtil implements Accountable {
public void setCurSeqFileIndex(QueryDataSource dataSource) {
curSeqFileIndex = 0;
}
+
+ @Override
+ public boolean overlappedSeqResourceSearchingNeedStop(
+ long endPointTime, Statistics<?> currentStatistics) {
+ return currentStatistics.getEndTime() >= endPointTime;
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index ead12a7e961..0bc85d4cbad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -45,7 +45,10 @@ public class QueryDataSource implements IQueryDataSource {
private int curSeqIndex = -1;
- // asc: startTime; desc: endTime
+ // asc: startTime, will be Long.MIN_VALUE if current tsfile resource is
degraded
+ // desc: endTime, will be Long.MAX_VALUE if current tsfile resource is
degraded
+ // if current tsfile resource is degraded, it will always be considered to
be overlapping with
+ // current point
private long curSeqOrderTime = 0;
private Boolean curSeqSatisfied = null;
@@ -98,7 +101,7 @@ public class QueryDataSource implements IQueryDataSource {
boolean res = ascending ? curIndex < seqResources.size() : curIndex >= 0;
if (res && curIndex != this.curSeqIndex) {
this.curSeqIndex = curIndex;
- this.curSeqOrderTime = seqResources.get(curIndex).getOrderTime(deviceID,
ascending);
+ this.curSeqOrderTime =
seqResources.get(curIndex).getOrderTimeForSeq(deviceID, ascending);
this.curSeqSatisfied = null;
}
return res;
@@ -140,7 +143,9 @@ public class QueryDataSource implements IQueryDataSource {
if (res && curIndex != this.curUnSeqIndex) {
this.curUnSeqIndex = curIndex;
this.curUnSeqOrderTime =
-
unseqResources.get(unSeqFileOrderIndex[curIndex]).getOrderTime(deviceID,
ascending);
+ unseqResources
+ .get(unSeqFileOrderIndex[curIndex])
+ .getOrderTimeForUnseq(deviceID, ascending);
this.curUnSeqSatisfied = null;
}
return res;
@@ -194,7 +199,8 @@ public class QueryDataSource implements IQueryDataSource {
int index = 0;
for (TsFileResource resource : unseqResources) {
orderTimeToIndexMap
- .computeIfAbsent(resource.getOrderTime(deviceId, ascending), key ->
new ArrayList<>())
+ .computeIfAbsent(
+ resource.getOrderTimeForUnseq(deviceId, ascending), key -> new
ArrayList<>())
.add(index++);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 10d9a89ff85..8b446221b3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -441,7 +441,17 @@ public class TsFileResource {
}
}
- public long getOrderTime(IDeviceID deviceId, boolean ascending) {
+ // cannot use FileTimeIndex
+ public long getOrderTimeForSeq(IDeviceID deviceId, boolean ascending) {
+ if (timeIndex instanceof DeviceTimeIndex) {
+ return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
+ } else {
+ return ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
+ }
+ }
+
+ // can use FileTimeIndex
+ public long getOrderTimeForUnseq(IDeviceID deviceId, boolean ascending) {
return ascending ? getStartTime(deviceId) : getEndTime(deviceId);
}